Source code for raftnode.transport

import socket
import time
from queue import Queue
from json import JSONDecodeError, dumps, loads
from threading import Lock, Thread

from raftnode import cfg, logger


[docs]class Transport: def __init__(self, my_ip: str, timeout: int, queue: Queue): self.host, self.port = my_ip.split(':') self.port = int(self.port) self.addr = my_ip self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((self.host, self.port)) self.server.listen() self.peers = list() self.lock = Lock() self.q = queue Thread(target=self.ping, args=(timeout,)).start()
[docs] def serve(self): ''' :param election: instance of the Election class :type election: Election This function starts a socket server and listen endlessly to the clients. It also checks the message types and delegates the message handling responsibility accordingly. Message types supported are: * add_peer: this message type means that a new peer has popped up in the cluster and should be added to the peers list of this server * heartbeat: the leader, once elected, sends heartbeats to the follower nodes, notifying them that it is alive and running * vote_request: when a leader is down or there is no leader in the network, any one node randomly becomes a candidate and sends out vote request to other nodes. The other nodes can either vote in favour or against the candidate node * ping: every node pings every other node from it's peers list to check if the peer is alive or dead. If it's dead, this node will remove the peer from it's peers list * put: message with type put is received from the client connected to this cluster. If this node is the leader, it will put the data into the database. If it's not the leader, it will redirect the request to the leader node and send the leader's response back to the client * get: message with type get is received from the client connected to this cluster. If this node is the leader, it will retrieve the data from the database and give it back to the client. If it's not the leader, it will redirect the request to the leader node and send the leader's response back to the client * data: this type of message is sent by the leader to the follower nodes along with the heartbeat. It contains the current term and the latest commit_id ''' self.election = self.q.get()['election'] while True: if not self.q.empty(): election = self.q.get() if bool(election): self.election = election if isinstance(self.election, dict): self.election = self.election['election'] logger.debug( f'current membership status of this node: {self.election.status}') client, address = self.server.accept() try: msg = client.recv(1024).decode('utf-8') except ConnectionResetError as e: continue msg = self.decode_json(msg) if isinstance(msg, dict): msg_type = msg['type'] if msg_type == 'add_peer': all_peers = self.peers.copy() msg.update({'sender': self.addr}) self.add_peer(msg) client.send(self.encode_json( {'type': 'add_peer', 'payload': all_peers})) elif msg_type == 'heartbeat': term, commit_id = self.election.heartbeat_handler( message=msg) client.send(self.encode_json( {'type': 'heartbeat', 'term': term, 'commit_id': commit_id})) elif msg_type == 'vote_request': choice, term = self.election.decide_vote( msg['term'], msg['commit_id'], msg['staged']) client.send(self.encode_json( {'type': 'vote_request', 'term': term, 'choice': choice})) elif msg_type == 'ping': msg.update({'is_alive': True, 'addr': self.addr}) client.send(self.encode_json(msg)) elif msg_type == 'peers': if self.election.status == cfg.LEADER: peers_response = {'type': 'peers'} peers_response.update({'peers': self.peers}) client.send(self.encode_json(peers_response)) else: reply = self.redirect_to_leader(self.encode_json(msg)) client.send(bytes(reply, encoding='utf-8')) else: reply = self.__resolve_msg(msg) if reply: client.send(self.encode_json(reply)) else: send_msg = 'hey there; from {}'.format(self.addr) client.send(bytes(self.addr, encoding='utf-8')) client.close()
def __resolve_msg(self, msg: dict): try: msg_type = msg['type'] if self.election.status == cfg.LEADER: client_response = {'type': msg_type} handler = getattr(self.election, f'handle_{msg_type}') reply = handler(msg) client_response.update({'data': reply}) return client_response else: reply = self.redirect_to_leader(self.encode_json(msg)) return self.decode_json(reply) except Exception as e: raise e
[docs] def redirect_to_leader(self, message: dict): ''' If this node is not the leader, this function will redirect the request along with the message to the leader of the cluster :param message: message to send to the client :param type: dict ''' try: logger.info( f'[LEADER REDIRECT] redirecting to leader at address {self.election.leader}') leader_host, leader_port = (self.election.leader).split(':') s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((leader_host, int(leader_port))) s.send(message) leader_reply = s.recv(1024).decode('utf-8') return leader_reply except ConnectionRefusedError as e: return {'data': 'leader unavailable'} except AttributeError as e: if "object has no attribute" in e.args[0]: time.sleep(1) except ConnectionResetError as e: return {'data': 'connection reset by peer'}
def __proxy_client(self, addr: str, message=None): if not message: message = {'type': 'echo', 'payload': 'whatsup?'} client = self.reconnect(addr) if not client: return client.send(self.encode_json(message)) msg = client.recv(1024).decode('utf-8') client.close() return
[docs] def req_add_peer(self, addr: str): ''' When this node starts, it send out a message to all the peers to add itself in their peers list :param addr: address of this node in the format ip:port :type addr: str ''' client = self.reconnect(addr) if not client: logger.info(f'Could not connect to peer {addr}') return message = self.encode_json({'type': 'add_peer', 'payload': self.addr}) client.send(message) msg = client.recv(1024).decode('utf-8') reply = self.decode_json(msg) all_peers = reply['payload'] with self.lock: self.peers.append(addr) if all_peers: with self.lock: for peer in all_peers: self.peers.append(peer) self.peers = list(set(self.peers))
[docs] def add_peer(self, message: dict): ''' This functions adds any new peers to their list of peers :param message: message received from the new peer :type message: dict ''' try: reciever_address = message['sender'] new_peer = message['payload'] if new_peer not in self.peers: with self.lock: self.peers.append(new_peer) if self.election.status == cfg.LEADER: self.election.start_heartbeat() except Exception as e: raise e
[docs] def reconnect(self, addr: str): ''' This function tries to connect this node to the peer at address addr 20 times. If the peer is not connected in 20 tries, it returns False :param addr: address of the other peer :type addr: str ''' i = 0 # while i < 20: try: host, port = addr.split(':') client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, int(port))) return client except ConnectionRefusedError: client.close() time.sleep(0.002) if addr in self.peers: with self.lock: self.peers.remove(addr) del client except TimeoutError as e: logger.info(f'Timeout error connecting to peer {addr}') logger.info(f'Removing peer {addr} from list of peers') if addr in self.peers: with self.lock: self.peers.remove(addr) client.close() except Exception as e: client.close() raise e
# finally: # i += 1 # else: # with self.lock: # self.peers.remove(addr) # return False
[docs] def ping(self, timeout: float): ''' send a ping message to all the peers in the peers list :param timeout: time interval after which all peers will get a ping from this node :type timeout: float ''' while True: if self.peers: logger.debug(f'peers >>> {self.peers}') for peer in self.peers: Thread(target=self.echo, args=(peer,)).start() else: logger.debug('ping >>> no peers to ping') time.sleep(timeout)
[docs] def echo(self, peer): ''' This function will send the ping message to the peer at the address `addr` :param peer: address of the peer in `ip:port` format ''' try: client = self.reconnect(peer) if not client: return echo_msg = self.encode_json({'type': 'ping'}) client.send(echo_msg) echo_reply = self.decode_json(client.recv(1024).decode('utf-8')) client.close() if echo_reply: logger.debug('ping >>> {}'.format(echo_reply)) if echo_reply['is_alive']: return True return False except ConnectionResetError as e: logger.info(f'[ECHO]lost connection to peer {peer}') return None
[docs] def heartbeat(self, peer: str, message: dict = None) -> dict: ''' If this node is the leader, it will send a heartbeat message to the follower at address `peer` :param peer: address of the follower in `ip:port` format :type peer: str :param message: heartbeat message; it consists current term and address of this node (leader node) :type message: dict :returns: heartbeat message response as received from the follower :rtype: dict ''' try: client = self.reconnect(peer) if not client: return message.update({'type': 'heartbeat'}) heartbeat_message = self.encode_json(message) client.send(heartbeat_message) heartbeat_reply = client.recv(1024).decode('utf-8') if bool(heartbeat_reply): heartbeat_reply = self.decode_json(heartbeat_reply) client.close() return heartbeat_reply except ConnectionResetError as e: logger.info(f'[HEARTBEAT]lost connection to peer {peer}') return None
[docs] def vote_request(self, peer: str, message: dict = None): ''' sends vote request to the peer and return vote response to this node :param peer: address of the peer in `ip:port` format :type peer: str :param message: vote message; this will be sent to the other nodes :type message: dict :returns: vote response as received from the voter node :rtype: dict ''' try: client = self.reconnect(peer) if not client: return message.update({'type': 'vote_request'}) vote_request_message = self.encode_json(message) client.send(vote_request_message) vote_reply = self.decode_json(client.recv(1024).decode('utf-8')) client.close() return vote_reply except ConnectionResetError as e: logger.info(f'[VOTE REQUEST]lost connection to peer {peer}') return None
[docs] def send_data(self, peer=None, message: dict = None): ''' sends heartbeat data to the peer and returns response to this node :param peer: address of the peer in `ip:port` format :type peer: str :param message: heartbeat data message; it contains term and latest commit_id :type message: dict :returns: vote response as received from the voter node :rtype: dict ''' client = self.reconnect(peer) if not client: return message.update({'type': 'data'}) data_message = self.encode_json(message) client.send(data_message) data_reply = self.decode_json(client.recv(1024).decode('utf-8')) client.close() return data_reply
[docs] def encode_json(self, msg: dict) -> bytes: ''' convert json to bytes object :param msg: message in dict format :type msg: dict :returns: json message converted to bytes :rtype: bytes ''' if isinstance(msg, dict): return bytes(dumps(msg), encoding='utf-8') return msg
[docs] def decode_json(self, msg): ''' convert bytes object to json :param msg: bytes object as received from the client or other node :type msg: bytes :returns: bytes message converted to json :rtype: dict ''' if isinstance(msg, str): try: return loads(msg) except JSONDecodeError as e: logger.exception('JSON format incorrect {}'.format(msg)) except Exception as e: raise e return msg