refactor(rcf): Cleanup operators, cleanup client

This commit is contained in:
Swann Martinez
2019-03-25 13:24:56 +01:00
parent ef752af8ce
commit 704ea35129
2 changed files with 19 additions and 29 deletions

View File

@ -96,14 +96,11 @@ class RCFMessage(object):
mtype = None # data mtype (string)
body = None # data blob
def __init__(self, key=None, id=None, mtype=None, body=None, pointer=None):
def __init__(self, key=None, id=None, mtype=None, body=None):
self.key = key
self.mtype = mtype
self.body = body
self.id = id
self.pointer = pointer
self.get = None
self.set = None
def store(self, dikt):
"""Store me in a dict if I have anything to store"""
@ -162,24 +159,25 @@ class Client():
factory=None,
address="localhost"):
self.status = RCFStatus.IDLE
self.is_admin = is_admin
# 0MQ vars
self.context = context
self.pull_sock = None
self.req_sock = None
self.poller = None
# Client configuration
self.id = id.encode()
self.on_recv = on_recv
self.on_post_init = on_post_init
self.status = RCFStatus.IDLE
self.is_admin = is_admin
self.address = address
self.bind_ports()
# Main client loop registration
self.task = asyncio.ensure_future(self.main())
# client routine registration
self.load_task = asyncio.ensure_future(self.load())
self.tick_task = None
self.property_map = RCFStore(custom_factory=factory)
@ -212,7 +210,7 @@ class Client():
time.sleep(0.1)
async def main(self):
async def load(self):
self.status = RCFStatus.CONNECTING
logger.info("{} client syncing".format(id))
@ -238,11 +236,14 @@ class Client():
logger.info("{} client running".format(id))
self.push_update("net/clients/{}".format(self.id.decode()),"client",None)
self.push_update("net/objects/{}".format(self.id.decode()),"client_object",None)
self.push_update("net/clients/{}".format(self.id.decode()),"client",self.id)
self.push_update("net/objects/{}".format(self.id.decode()),"client_object","None")
self.tick_task = asyncio.ensure_future(self.tick())
self.status = RCFStatus.CONNECTED
async def tick(self):
# Main loop
while True:
# TODO: find a better way
@ -250,10 +251,8 @@ class Client():
if self.pull_sock in socks:
rcfmsg = RCFMessage.recv(self.pull_sock)
# if rcfmsg.pointer:
rcfmsg.store(self.property_map)
rcfmsg.store(self.property_map)
for f in self.on_recv:
f(rcfmsg)
@ -263,7 +262,6 @@ class Client():
def push_update(self, key, mtype, body):
rcfmsg = RCFMessage(key, self.id, mtype, body)
rcfmsg.send(self.push_sock)
# self.push_sock.send_multipart()
def stop(self):
logger.debug("Stopping client")
@ -271,7 +269,8 @@ class Client():
self.req_sock.close()
self.push_sock.close()
self.pull_sock.close()
self.task.cancel()
self.load_task.cancel()
self.tick_task.cancel()
class Server():
def __init__(self, context=zmq.Context(), id="admin"):