Welcome to distcache’s APIs!

This section lists all the API for distcache

Cache Client

Implements distcache client. It interacts with the users.

Note: This package lets all the individual client discover the health of the servers themselves.

class distcache.cache_client.CacheClient[source]

Implements cache client. It responds to user requests. ?? monitors health of cache clients.

add(key, diff)[source]

Add diff to the value corresponding to key in a thread safe manner. :param diff: the amount to be added to the value of key :return: boolean indicating if the operation was successful or not. :rtype: bool

decrement(key)[source]

Decrement value corresponding to the key in a thread-safe manner. :return: boolean indicating if the operation was successful or not. :rtype: bool

delete(key)[source]

Get the value of key from the cache :return: corresponding value for the key

execute_query(key, message)[source]

The central place to execute all the client queries. It finds the server for the query. Creates a socket. Sends message to the server. And conveys the server response to the calling function.

Parameters:
  • key – the user defined key which is to be manipulated.
  • message – tuple of key plus operation and optionally values.
Returns:

response of the server

get(key)[source]

Get the value of key from the cache :return: corresponding value for the key

gets(keys)[source]

Gets the values of keys from the cache. Same as get but avoids expensive network calls. If you want two keys which are on different server, gets is same as get or a bit slower. :return [list of values]: corresponding values for the keys

increment(key)[source]

Increment value corresponding to the key in a thread-safe manner. :return: boolean indicating if the operation was successful or not.

set(key, value)[source]

Set or update the value of key from the cache. Also updates the LRU cache for already existing key or (key, value) :return: bool value indicating if the operation was successful or not.

Cache Server

class distcache.cache_server.CacheServer(host='localhost', port=2050, capacity=100, expire=0, filename=0)[source]

Implements cache client. It has different types of cache eviction policies at disposal. It responds to queries of cache server.

By default all of the operations that the cache server carries out are logged and can be used to reconstruct the cache in the event of error or server shutdown.

The snapshots are however the very cache that will be the result of replaying logs but may miss some of the latest server operations. It will be faster to rebuild from snapshot but saving snapshots are time consuming operations.

handle_client(client_socket)[source]

Listen to queries from specific client. :param client_socket: :param client_address: :return:None

monitor()[source]

Listens for new connections and queries from the clients. And add it as a cache server.

parse_message(message)[source]

Parse and execute the command :param message: the message sent by the cache_server :return: depends on the operation that was carried out after parsing message

reconstruct()[source]

Load the cache from the latest database snapshot :return: None

replay_log()[source]

Rebuild the cache by treating each of the logged objects as a client operation. :return: None

snapshot()[source]

Snapshot every conf.save_every_k_seconds :return: None

Consistent Hashing

Consistent Hashing

Consistent hashing is scheme which does not depend on the number of servers. Each server is assigned a position on a abstract circle or a hash ring.

So you have a list of servers [a, b, c]

You make k copies of them. It makes the consistent hashing better.

servers = [a1, a2, ak, b1, b2, bk, c1, c2, ck]

Of course, you can have weighted servers so that better servers have higher chances of landing a keys.

Now we assign them a position in the 32bit ring.

[…ak….b2….c1….c2….a2…..b1….bk….a1…..ck……….]

[…10….19k…1M…28M….54.2M..60M…67M…100M…124M..23^32-1]

So we need to sort the servers according to their position.

Now when user says which server to send a particular key “apple”.

We hash the key: apple-> 16M.

What’s larger than 16M and has a server? 28M.

Great, we send the key to c2. c2 is c, remember?

What happens when a server is down?

There is no response and the query has to be queried against a database.

or any other function. And, it has to be stored again in the server.

If the server c went down. Our key ring would be updated to something like this.

[…ak….b2………..a2…..b1….bk….a1……………..]

[…10….19k……….54.2M..60M…67M…100M……..23^32-1]

We hash the key: apple-> 16M.

What’s larger than 16M and has a server? 54.2M.

Great, we send the key to a2.

Similarly, we can add servers in the same way.

There will be cache misses first because the server next to the new server on the ring has the key.

Then those keys will expire out or will be LRU invalidated.

Similarly, there is cache miss when a server goes down. All the queries that were to be handed by that.

server are sent to the next server.

Notes: We compute position for servers until there is no collision.

Example usage:
servers = [‘192.168.0.246:11212’, ‘192.168.0.247:11212’, ‘192.168.0.249:11212’] weights = [3, 3, 3] ring = ConsistentHashing(servers, weights) server = ring.get_node(‘my_key’)

TODO: Allow users to specify both number of replicas and weight of servers TODO: Use a better hashing technique. Something that distributes more uniformly among the keys.

class distcache.consistent_hashing.ConsistentHashing(nodes=None, weights=None)[source]

Implements consistent hashing

add_node(node, weight=5)[source]

Add node to the HashRing of consistent hashing scheme.

Parameters:
  • node – new node to be added (ip address in this case)
  • weight – weight of the new node to be added.
Returns:

None

get_node(key)[source]

Get the node/server where the key is or should be.

Parameters:key – key whose node/server is to be computed.
Returns:node where the key should be stored or retrived from.
remove_node(node)[source]

Remove node from the ring because it is dead or unavailable.

Parameters:node – node to be removed from the consistent hashing scheme.

It will no longer be considered while hashing.

Returns:None

Health Client

class distcache.health_client.HealthClient[source]

Implements a health client to respond to health probes from health server

relay_health()[source]

If it receives any health probe from the server it replies with an ACK_MESSAGE to acknowledge that it is alive and well.

Returns:None

Health Server

class distcache.health_server.HealthServer[source]

Implements a health server to monitor health of all clients and report it to the cache server.

monitor()[source]

Listens for new HealthClient connections. Monitors the health of the clients.

Returns:None
probe_health(client_socket, client_address)[source]

Sends heart beat every k second. If three heart beat requests are not acknowledged for n times, the client is dead.

Parameters:
  • client_socket – client socket on which health probes are to be sent and response received.
  • client_address – client address
Returns:

None

report_health(message, client_socket)[source]

Report the cache clients health to the server

Parameters:
  • message – any message. In this case list of unavailable servers
  • client_socket – socket object connected to the cache server
Returns:

None

summary()[source]

Keep logging the number of healthy clients in a fixed time interval

Returns:None

Logger

class distcache.logger.Logger(filename='cache.db', mode='ab', batch_size=1)[source]

Implements a simple logger

close()[source]

Close the logger. Close the log file safely

flush()[source]

Writes whatever object is in the log queue is written to the disk. No worries if someone appended to the logs when it is being written

log(object)[source]

Write objects that are not bytes type in batches to the log file :param object: basically anything [int, str, list, etc]

Object instances:
(“set”, “hi”, “greeting”), (“set”, 1, 100), (“del”, 1)
Returns:None
log_bytes(object)[source]

Write objects of bytes type in batches to the log file :param object: basically any serialized object

for instance:
obj = (“life”, “is wonderful”) byte_obj = pickle.dumps(obj) logger = Logger() logger.log_bytes(byte_obj)
You can do the same with images or pdfs
obj = open(“some_image_file.png”, mode=’rb’).read() logger = Logger() logger.log_bytes(byte_obj)

Note: This function does not check if the object is not bytes. The user should do the checks.

Returns:None
read_logs()[source]

Reads logs from the file.

LRU Cache

Implementation of LRU Cache.

Implementing Cache in a separate class/file allows for robust testing and is a practice of loose-coupling. It allows, in future, to implement other types of cache (eviction policy) and just interchange different types of cache. Also, it allows clean code, in other places using cache. Allows for easier reasoning of code.

# Tested against leetcode’s test cases: https://leetcode.com/submissions/detail/359080389/. In project, test cases would be way better though.

class distcache.lru_cache.LRUCache(capacity=100)[source]

Implements LRU Cache

add(key, diff)[source]

Add diff to the value corresponding to key in a thread safe manner.

Parameters:
  • diff – (int, float, double) the amount to be added to the value of key
  • key – the key on which operation is to be carried out
Returns:

boolean indicating if the operation was successful or not.

delete(key)[source]

Deletes key from server. Nothing happens if the key does not exist in the cache.

Parameters:key – the key which is to be deleted from the cache
Returns:boolean indicating if the operation was successful or not.
get(key)[source]

Get the value corresponding to the key. For now, the value of keys can not be boolean

Parameters:key – the key whose value is to be retrieved
Returns:value of the key if it exists, otherwise False.
set(key, value)[source]

The server decided the key value be stored in this client. If it is new, just add to the cache If the key is old, update it with new value and also the LRU

Parameters:key – key whose value is to be set. It can both be new key or previously stored key.
Returns:boolean indicating success of the operation

utils

Implements network utils like sending and receiving message over socket

distcache.utils.receive_message(client_socket, HEADER_LENGTH, FORMAT)[source]

Receives message on the client_socket

Parameters:
  • client_socket – the socket on which message is received
  • FORMAT – the format in which message length is to be encoded. (UTF-8 is default FORMAT)
Returns:

False if receiving message was not successful. Else, returns whatever message was received after

deserializing it.

distcache.utils.send_message(message, client_socket, HEADER_LENGTH, FORMAT)[source]

Sends message on the client_socket.

Message sending occurs in two stages:
First, the message is serialized using serializer and the length of message is sent encoded in FORMAT. Then, message is sent.
Parameters:
  • message – the message to be sent. It can be any combination of different data types.
  • client_socket – the socket on which message is sent
  • FORMAT – the format in which length of message is to be encoded. (UTF-8 is default FORMAT)
Returns:

None

distcache.utils.send_receive_ack(message, client_socket, HEADER_LENGTH, FORMAT)[source]

Sends message from the client_socket. Receives message on the client_socket

Parameters:
  • message – Any message/object that is to be sent.
  • client_socket – the socket on which to send and receive message.
  • HEADER_LENGTH – the header length of the message.
  • FORMAT – the format in which message length is to be encoded. (UTF-8 is default FORMAT)
Returns:

response received. False if no response was received

Indices and tables