feat(rcf): id support

This commit is contained in:
Swann
2019-02-14 15:30:35 +01:00
parent 886befeb14
commit b2e6d27ec7
3 changed files with 54 additions and 32 deletions

View File

@ -7,7 +7,7 @@ import random
import struct
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
# TODO: Add message time and author stamp for reliabilty
@ -15,18 +15,21 @@ class RCFMessage(object):
"""
Message is formatted on wire as 2 frames:
frame 0: key (0MQ string) // property path
frame 1: mtype (0MQ string) // property path
frame 2: body (blob) // Could be any data
frame 1: id (0MQ string) // property path
frame 2: mtype (0MQ string) // property path
frame 3: body (blob) // Could be any data
"""
key = None # key (string)
id = None # User (string)
mtype = None # data mtype (string)
body = None # data blob
def __init__(self, key=None, mtype=None, body=None):
def __init__(self, key=None, id=None, mtype=None, body=None):
self.key = key
self.mtype = mtype
self.body = body
self.id = id
def store(self, dikt):
"""Store me in a dict if I have anything to store"""
@ -40,17 +43,18 @@ class RCFMessage(object):
key = '' if self.key is None else self.key.encode()
mtype = '' if self.mtype is None else self.mtype.encode()
body = '' if self.body is None else umsgpack.packb(self.body)
socket.send_multipart([key,mtype, body])
socket.send_multipart([key,self.id,mtype, body])
@classmethod
def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
key,mtype, body = socket.recv_multipart(zmq.NOBLOCK)
key,id,mtype, body = socket.recv_multipart(zmq.NOBLOCK)
key = key.decode() if key else None
id = id if id else None
mtype = mtype.decode() if body else None
body = umsgpack.unpackb(body) if body else None
return cls(key=key,mtype=mtype, body=body)
return cls(key=key,id=id,mtype=mtype, body=body)
def dump(self):
if self.body is None:
@ -123,18 +127,18 @@ class Client():
rcfmsg = RCFMessage.recv(self.pull_sock)
rcfmsg.store(self.property_map)
rcfmsg.dump()
# rcfmsg.dump()
for f in self.recv_callback:
f(rcfmsg)
def push_update(self, key,mtype,body):
rcfmsg = RCFMessage(key,mtype,body)
rcfmsg = RCFMessage(key,self.id,mtype,body)
rcfmsg.send(self.push_sock)
# self.push_sock.send_multipart()
def stop(self):
logger.info("Stopping client")
logger.debug("Stopping client")
self.poller.unregister(self.pull_sock)
self.req_sock.close()
self.push_sock.close()
@ -182,7 +186,7 @@ class Server():
while True:
# TODO: find a better way
await asyncio.sleep(0.016)
await asyncio.sleep(0.0001)
try:
socks = dict(self.poller.poll(1))
@ -202,7 +206,7 @@ class Server():
self.pub_sock.send_multipart(msg)
def stop(self):
logger.info("Stopping server")
logger.debug("Stopping server")
self.poller.unregister(self.request_sock)
self.pub_sock.close()
self.request_sock.close()