feat: serialisation refactoring
This commit is contained in:
25
client.py
25
client.py
@ -76,12 +76,12 @@ class RCFClient(object):
|
||||
self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance(
|
||||
address, str) else address), b'%d' % port])
|
||||
|
||||
def set(self, key, value):
|
||||
def set(self, key):
|
||||
"""Set new value in distributed hash table
|
||||
Sends [SET][key][value] to the agent
|
||||
"""
|
||||
self.pipe.send_multipart(
|
||||
[b"SET", umsgpack.packb(key), umsgpack.packb(value)])
|
||||
[b"SET", umsgpack.packb(key)])
|
||||
|
||||
def get(self, key):
|
||||
"""Lookup value in distributed hash table
|
||||
@ -161,14 +161,20 @@ class RCFClientAgent(object):
|
||||
logger.error("E: too many servers (max. %i)", SERVER_MAX)
|
||||
|
||||
elif command == b"SET":
|
||||
key, value = msg
|
||||
key = umsgpack.unpackb(msg[0])
|
||||
value = None
|
||||
|
||||
value = helpers.dump(key)
|
||||
|
||||
if value:
|
||||
logger.info(key,"dumped")
|
||||
# Send key-value pair on to server
|
||||
rcfmsg = message.RCFMessage(key=key, id=self.id, mtype="", body=value)
|
||||
|
||||
# Send key-value pair on to server
|
||||
rcfmsg = message.RCFMessage(key=umsgpack.unpackb(
|
||||
key), id=self.id, mtype="", body=umsgpack.unpackb(value))
|
||||
rcfmsg.store(self.property_map)
|
||||
|
||||
rcfmsg.send(self.publisher)
|
||||
rcfmsg.store(self.property_map)
|
||||
rcfmsg.send(self.publisher)
|
||||
else:
|
||||
logger.error("Fail to dump ")
|
||||
|
||||
elif command == b"GET":
|
||||
key = umsgpack.unpackb(msg[0])
|
||||
@ -223,6 +229,7 @@ def rcf_client_agent(ctx, pipe):
|
||||
rcfmsg.store(agent.property_map)
|
||||
elif agent.state == State.ACTIVE:
|
||||
if rcfmsg.id != agent.id:
|
||||
helpers.load(rcfmsg.key,rcfmsg.body)
|
||||
rcfmsg.store(agent.property_map)
|
||||
action = "update" if rcfmsg.body else "delete"
|
||||
logging.info("I: received from {}:{},{} {}".format(
|
||||
|
Reference in New Issue
Block a user