feat(rcf): code cleanup
This commit is contained in:
@ -11,23 +11,25 @@ logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
class RCFTranslation():
|
||||
def get(self,data):
|
||||
def get(self, data):
|
||||
"""
|
||||
local program > rcf
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set(self,data):
|
||||
def set(self, data):
|
||||
"""
|
||||
rcf > local program
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class RCFMsgFactory():
|
||||
"""
|
||||
Abstract basic data bridge
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
raise NotImplementedError
|
||||
@ -39,7 +41,7 @@ class RCFMsgFactory():
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def unload(self,data):
|
||||
def unload(self, data):
|
||||
"""
|
||||
rcf > local program
|
||||
"""
|
||||
@ -66,22 +68,22 @@ class RCFMessage(object):
|
||||
self.mtype = mtype
|
||||
self.body = body
|
||||
self.id = id
|
||||
|
||||
self.factory = factory
|
||||
|
||||
|
||||
def store(self, dikt):
|
||||
"""Store me in a dict if I have anything to store"""
|
||||
# this seems weird to check, but it's what the C example does
|
||||
# this currently erasing old value
|
||||
if self.key is not None and self.body is not None:
|
||||
dikt[self.key] = self
|
||||
|
||||
|
||||
def send(self, socket):
|
||||
"""Send key-value message to socket; any empty frames are sent as such."""
|
||||
key = ''.encode() if self.key is None else self.key.encode()
|
||||
mtype = ''.encode() if self.mtype is None else self.mtype.encode()
|
||||
body = ''.encode() if self.body is None else umsgpack.packb(self.body)
|
||||
id = ''.encode() if self.id is None else self.id
|
||||
id = ''.encode() if self.id is None else self.id
|
||||
|
||||
try:
|
||||
socket.send_multipart([key, id, mtype, body])
|
||||
@ -115,10 +117,10 @@ class RCFMessage(object):
|
||||
|
||||
|
||||
class Client():
|
||||
def __init__(self, context=zmq.Context(), id="default", on_recv=None,on_post_init=None,is_admin=False):
|
||||
def __init__(self, context=zmq.Context(), id="default", on_recv=None, on_post_init=None, is_admin=False):
|
||||
self.is_admin = is_admin
|
||||
|
||||
#0MQ vars
|
||||
# 0MQ vars
|
||||
self.context = context
|
||||
self.pull_sock = None
|
||||
self.req_sock = None
|
||||
@ -161,7 +163,7 @@ class Client():
|
||||
self.poller = zmq.Poller()
|
||||
self.poller.register(self.pull_sock, zmq.POLLIN)
|
||||
|
||||
time.sleep(0.5)
|
||||
time.sleep(0.1)
|
||||
|
||||
async def main(self):
|
||||
logger.info("{} client syncing".format(id))
|
||||
@ -175,15 +177,15 @@ class Client():
|
||||
|
||||
if rcfmsg_snapshot.key == "SNAPSHOT_END":
|
||||
logger.info("snapshot complete")
|
||||
break
|
||||
break
|
||||
else:
|
||||
logger.info("received : {}".format(rcfmsg_snapshot.key))
|
||||
rcfmsg_snapshot.store(self.property_map)
|
||||
except:
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
|
||||
for f in self.on_post_init:
|
||||
f()
|
||||
f()
|
||||
logger.info("{} client running".format(id))
|
||||
|
||||
# Main loop
|
||||
@ -229,7 +231,6 @@ class Server():
|
||||
self.bind_ports()
|
||||
# Main client loop registration
|
||||
self.task = asyncio.ensure_future(self.main())
|
||||
|
||||
|
||||
logger.info("{} client initialized".format(id))
|
||||
|
||||
@ -266,7 +267,7 @@ class Server():
|
||||
# Snapshot system for late join
|
||||
if self.request_sock in socks:
|
||||
msg = self.request_sock.recv_multipart(zmq.DONTWAIT)
|
||||
|
||||
|
||||
identity = msg[0]
|
||||
request = msg[1]
|
||||
print("reveived snapshot request from {}".format(identity.decode()))
|
||||
@ -275,14 +276,15 @@ class Server():
|
||||
else:
|
||||
logger.info("Bad snapshot request")
|
||||
break
|
||||
|
||||
for k,v in self.property_map.items():
|
||||
logger.info("Sending {} snapshot to {}".format(k,identity))
|
||||
self.request_sock.send(identity,zmq.SNDMORE)
|
||||
|
||||
for k, v in self.property_map.items():
|
||||
logger.info(
|
||||
"Sending {} snapshot to {}".format(k, identity))
|
||||
self.request_sock.send(identity, zmq.SNDMORE)
|
||||
v.send(self.request_sock)
|
||||
|
||||
msg_end_snapshot = RCFMessage(key="SNAPSHOT_END",id=identity)
|
||||
self.request_sock.send(identity,zmq.SNDMORE)
|
||||
|
||||
msg_end_snapshot = RCFMessage(key="SNAPSHOT_END", id=identity)
|
||||
self.request_sock.send(identity, zmq.SNDMORE)
|
||||
msg_end_snapshot.send(self.request_sock)
|
||||
logger.info("done")
|
||||
elif self.collector_sock in socks:
|
||||
|
Reference in New Issue
Block a user