import time
from os import getenv, makedirs, path
from threading import Lock, Thread
from collections import deque
import shelve
from raftnode import cfg, logger
from raftnode.datastore.memory import MemoryStore
[docs]class Store:
def __init__(self, store_type: str = 'memory', data_dir: str = 'data'):
self.commit_id = 0
self.log = deque()
self.staged = None
self.db = self.__get_database(store_type, data_dir=data_dir)
self.__lock = Lock()
self.__data_dir = getenv('DATA_DIR', data_dir)
self.__log_file = getenv('LOG_FILENAME', 'OrderedLog')
self.__data_file = getenv('DATA_FILENAME', 'data.json')
self.__check_data_dir()
self.__session()
def __session(self):
self.f = shelve.open(path.join(self.__data_dir,self.__log_file), writeback=True)
try:
if self.f['data']:
self.commit_id = self.f['data'][-1]['commit_id']
logger.debug(f'[SHELVE LOG] commit id, {self.commit_id}')
except KeyError as e:
self.f['data'] = self.log
logger.debug(f'[SHELVE LOG] Initial log, {self.f["data"]}')
def __flush(self):
self.f['data'].append(self.staged)
logger.info(f'[DATA INSERT] {self.f["data"][-1]}')
self.f.close()
def __check_data_dir(self):
if not path.exists(self.__data_dir):
makedirs(self.__data_dir)
def __get_database(self, store_type: str, **kwargs):
'''
get instance of the database
:param store_type: type of data store to be used; either memory or database
:type store_type: str
'''
if store_type == 'database':
from raftnode.datastore.rocks import RockStore
database = kwargs.get('database', None)
data_dir = kwargs.get('data_dir', None)
db = RockStore(data_dir=data_dir)
else:
db = MemoryStore()
return db
[docs] def action_handler(self, message: dict):
'''
handle the commit and log actions sent by the leader node
:param message: log/commit data as received from the leader
:type message: dict
'''
with self.__lock:
action = message['action']
payload = message['payload']
if action == 'log':
self.staged = payload
elif action == 'commit':
if isinstance(payload, list):
for command in payload:
# cid = message.get('commit_id', self.commit_id)
namespace = command.get('namespace', 'default')
delete = command.get('delete', False)
leader_cid = command
logger.debug(f'[OLD COMMANDS] adding command {command}')
if not self.staged:
self.staged = command
self.commit(namespace, delete)
else:
namespace = payload.get('namespace', 'default')
delete = payload.get('delete', False)
logger.debug(f'[COMMAND] {payload}')
if not self.staged:
self.staged = payload
self.commit(namespace, delete)
return
[docs] def put(self, term: int, payload: dict, transport, majority: int) -> bool:
'''
Insert data into the database. If this is the leader node, first broadcast
the message to all the followers and wait for atleast `majority + 1`
confirmations. Once `majority + 1` followers confirm, send out a commit
message. This will instruct the followers to commit the data to their databases
:param term: term of this node
:type term: int
:param payload: data to be inserted into log or database
:type payload: dict
:param transport: instance of the Transport class
:type transport: Transport
:param majority: how many nodes constitute the majority
:type majority: int
'''
namespace = payload.get('namespace', 'default')
with self.__lock:
self.staged = payload
waited = 0
log_message = {
'term': term,
'addr': transport.addr,
'payload': payload,
'action': 'log',
'commit_id': self.commit_id
}
log_confirmations = [False] * len(transport.peers)
Thread(target=self.send_data, args=(
log_message, transport, log_confirmations,)).start()
while sum(log_confirmations) + 1 < majority:
waited += 0.0005
time.sleep(0.0005)
if waited > cfg.MAX_LOG_WAIT / 1000:
logger.info(
f"waited {cfg.MAX_LOG_WAIT} ms, update rejected:")
return False
commit_message = {
"term": term,
"addr": transport.addr,
"payload": payload,
"action": "commit",
"commit_id": self.commit_id
}
self.commit(namespace)
Thread(target=self.send_data,
args=(commit_message, transport,)).start()
logger.info(
"majority reached, replied to client, sending message to commit")
return True
[docs] def send_data(self, message: dict, transport, confirmations: list = None):
'''
send the log or commit data to the follower nodes and record their
responses in the `confirmations` list
:param message: data toe be sent to the follower nodes
:type message: dict
:param transport: instance of the transport class
:type transport: Transport
:param confirmations: list of the confirmations (initialized to False)
:type confirmations: list
'''
for i, peer in enumerate(transport.peers):
reply = transport.heartbeat(peer, message)
if reply and confirmations:
confirmations[i] = True
[docs] def get(self, payload: dict):
'''
retrieve data from the database based on the `key` in the
`payload`
:param payload: dictionary consisting the key using which the
data needs to be retrieved from the database
:type payload: dict
'''
namespace = payload.get('namespace', 'default')
key = payload["key"]
value = self.db.get(key=key, namespace=namespace)
payload.update({'value': value})
return payload
[docs] def delete(self, term: int, payload: dict, transport, majority: int):
namespace = payload.get('namespace', 'default')
with self.__lock:
self.staged = payload
waited = 0
log_message = {
'term': term,
'addr': transport.addr,
'payload': payload,
'action': 'log',
'commit_id': self.commit_id
}
log_confirmations = [False] * len(transport.peers)
Thread(target=self.send_data, args=(
log_message, transport, log_confirmations,)).start()
while sum(log_confirmations) + 1 < majority:
waited += 0.0005
time.sleep(0.0005)
if waited > cfg.MAX_LOG_WAIT / 1000:
logger.info(
f"waited {cfg.MAX_LOG_WAIT} ms, update rejected:")
return False
commit_message = {
"term": term,
"addr": transport.addr,
"payload": payload,
"action": "commit",
"commit_id": self.commit_id
}
Thread(target=self.send_data,
args=(commit_message, transport,)).start()
self.commit(namespace, delete=True)
logger.info(
"majority reached, replied to client, sending message to commit")
return True
[docs] def commit(self, namespace: str, delete: bool=False, **kwargs):
'''
commit the message to the database after getting
atleast `majority + 1` confirmations from the
follower nodes
'''
self.commit_id += 1
cid = kwargs.get('commit_id', self.commit_id)
# with self.__lock:
self.staged.update({'commit_id': cid})
if self.staged not in self.log:
logger.debug(f'[APPEND LOG] {self.staged}')
# self.log.append(self.staged)
self.__flush()
self.__session()
self.log = self.f['data']
key = self.staged['key']
if delete:
value = self.db.delete(key=key, namespace=namespace)
self.staged = None
logger.debug(f"[DELETE COMMAND] {self.staged}")
return value
value = self.staged['value']
self.staged = None
self.db.put(key, value, namespace=namespace)