Source code for distcache.health_server

import socket
import threading
import time

from distcache import config
from distcache import utils

config = config.config()


[docs]class HealthServer: """ Implements a health server to monitor health of all clients and report it to the cache server. """ def __init__(self): """ Initializes a HealthServer object. """ # Client details self.clients = [] self.unhealthy_clients = [] # list of client_socket # ACK Message details self.ACK_HEADER = 3 self.ACK_FORMAT = config.FORMAT self.ACK_MESSAGE = 'ACK'.encode(self.ACK_FORMAT) # HEARTBEAT configuration self.DEAD_THRESH = config.HEARTBEAT_THRESH self.probe_every_k_second = config.PROBE_EVERY_K_SECOND # Configure and start the health monitoring server self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_address = (config.IP, config.HEALTH_PROBE_PORT) self.server_socket.bind(self.server_address) print("Starting the server. Listening at {}:{}".format(*self.server_address)) self.server_socket.listen() # Configure and start the health reporting client self.reporting_address = (config.IP, config.HEALTH_REPORT_PORT) self.reporting_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) print("Trying to connect to {}:{}".format(*self.reporting_address)) self.reporting_socket.connect(self.reporting_address)
[docs] def probe_health(self, client_socket, client_address): """ Sends heart beat every k second. If three heart beat requests are not acknowledged for n times, the client is dead. :param client_socket: client socket on which health probes are to be sent and response received. :param client_address: client address :return: None """ print("Registered a new client {}:{}".format(*client_address)) self.clients.append(client_address) no_beat_count = 0 while True: time.sleep(self.probe_every_k_second) try: bytes_sent = client_socket.send(self.ACK_MESSAGE) if bytes_sent != self.ACK_HEADER: break client_socket.settimeout(5) response = client_socket.recv(self.ACK_HEADER) if response: if response == self.ACK_MESSAGE: no_beat_count = 0 continue else: no_beat_count += 1 except: no_beat_count += 1 if no_beat_count == self.DEAD_THRESH: break print("The client {}:{} is dead. No beat detected in the last {} attempts.".format(*client_address, self.DEAD_THRESH)) self.clients.remove(client_address) self.unhealthy_clients.append(client_address)
[docs] def monitor(self): """ Listens for new HealthClient connections. Monitors the health of the clients. :return: None """ print("Monitoring the clients...") threading.Thread(target=self.summary).start() threading.Thread(target=self.report_health, args=([], self.reporting_socket)).start() while True: client_socket, client_address = self.server_socket.accept() thread = threading.Thread(target=self.probe_health, args=(client_socket, client_address)) thread.start()
[docs] def report_health(self, message, client_socket): """ Report the cache clients health to the server :param message: any message. In this case list of unavailable servers :param client_socket: socket object connected to the cache server :return: None """ while True: time.sleep(5) # Report health regularly. # The response is false if the server does is unable to send any ACK response = utils.send_receive_ack(message, client_socket, config.HEADER_LENGTH, config.FORMAT) print("Report received by server: {}\n".format(response)) if response: self.unhealthy_clients = []
[docs] def summary(self): """ Keep logging the number of healthy clients in a fixed time interval :return: None """ while True: time.sleep(5) print("Active healthy clients: {}".format(len(self.clients)))
if __name__ == '__main__': server = HealthServer() server.monitor()