Source code for distcache.cache_server

import socket
import threading
import pickle
import time
import os

from distcache import config as conf
from distcache.lru_cache import LRUCache
from distcache import utils
from distcache import logger

The server is always listening to the client. It needs to detect if the client is alive.

[docs]class CacheServer: """ Implements cache client. It has different types of cache eviction policies at disposal. It responds to queries of cache server. By default all of the operations that the cache server carries out are logged and can be used to reconstruct the cache in the event of error or server shutdown. The snapshots are however the very cache that will be the result of replaying logs but may miss some of the latest server operations. It will be faster to rebuild from snapshot but saving snapshots are time consuming operations. """ def __init__(self, host='localhost', port=2050, capacity=100, expire=0, filename=0): """ :param num_virtual_replicas: number of virtual replicas of each cache server :param expire: expiration time for keys in seconds. Some other parameters to consider: socket_timeout, password """ config = conf.config() self.cache = LRUCache(capacity) # Cache server configuration self.ADDRESS = (host, port) self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind(self.ADDRESS) self.clients = {} # (ip, (id, socket, ip:port) print("Starting server at {}:{}".format(*self.ADDRESS)) self.server_socket.listen(config.LISTEN_CAPACITY) # Communication configuration self.HEADER_LENGTH = config.HEADER_LENGTH self.FORMAT = config.FORMAT self.LISTEN_CAPACITY = config.LISTEN_CAPACITY # Logging self.dbname = 'cache.db' if filename is None else filename self.logger = logger.Logger(filename=self.dbname, mode='a', batch_size=1) self.save_every_k_seconds = config.save_every_k_seconds self.reconstruct() self.monitor() # start the server threading.Thread(target=self.snapshot).start()
[docs] def snapshot(self): """ Snapshot every conf.save_every_k_seconds :return: None """ while True: time.sleep(self.save_every_k_seconds) with open(self.dbname, mode='wb') as db: pickle.dump(self.cache, db)
[docs] def reconstruct(self): """ Load the cache from the latest database snapshot :return: None """ if os.path.exists(self.dbname): with open(self.dbname, mode='rb') as db: self.cache = pickle.load(db)
[docs] def replay_log(self): """ Rebuild the cache by treating each of the logged objects as a client operation. :return: None """ logs = self.logger.read_logs() for log in logs: self.parse_message(log)
[docs] def parse_message(self, message): """ Parse and execute the command :param message: the message sent by the cache_server :return: depends on the operation that was carried out after parsing message """ # This should run in a separate thread message = pickle.loads(message) self.logger.log_bytes(message) if message[0] == "set": return self.cache.set(message[1], message[2]) elif message[0] == "del": return self.cache.delete(message[1]) elif message[0] == "get": return self.cache.get(message[1]) elif message[0] == "add": return self.cache.add(message[1], message[2]) else: print("Only these keywords are supported: get, set, delete") return message
[docs] def handle_client(self, client_socket): """ Listen to queries from specific client. :param client_socket: :param client_address: :return:None """ while True: response = client_socket.recv(self.HEADER_LENGTH) if not response: continue message_length = int(response.decode(self.FORMAT)) message = client_socket.recv(message_length) response = self.parse_message(message) utils.send_message(response, client_socket, self.HEADER_LENGTH, self.FORMAT)
[docs] def monitor(self): """ Listens for new connections and queries from the clients. And add it as a cache server. """ while True: client_socket, client_address = self.server_socket.accept() print("New client connection accepted: {}:{}".format(*client_address)) threading.Thread(target=self.handle_client, args=[client_socket]).start()
if __name__ == '__main__': server = CacheServer(host='localhost', port=2050)