feat(rcf): Listen event optimization, await only when its necessary
This commit is contained in:
@ -21,8 +21,8 @@ class RCFMessage(object):
|
||||
|
||||
"""
|
||||
key = None # key (string)
|
||||
id = None # User (string)
|
||||
mtype = None # data mtype (string)
|
||||
id = None # User (string)
|
||||
mtype = None # data mtype (string)
|
||||
body = None # data blob
|
||||
|
||||
def __init__(self, key=None, id=None, mtype=None, body=None):
|
||||
@ -43,18 +43,18 @@ class RCFMessage(object):
|
||||
key = '' if self.key is None else self.key.encode()
|
||||
mtype = '' if self.mtype is None else self.mtype.encode()
|
||||
body = '' if self.body is None else umsgpack.packb(self.body)
|
||||
socket.send_multipart([key,self.id,mtype, body])
|
||||
socket.send_multipart([key, self.id, mtype, body])
|
||||
|
||||
@classmethod
|
||||
def recv(cls, socket):
|
||||
"""Reads key-value message from socket, returns new kvmsg instance."""
|
||||
key,id,mtype, body = socket.recv_multipart(zmq.DONTWAIT)
|
||||
key = key.decode() if key else None
|
||||
id = id if id else None
|
||||
mtype = mtype.decode() if body else None
|
||||
key, id, mtype, body = socket.recv_multipart(zmq.DONTWAIT)
|
||||
key = key.decode() if key else None
|
||||
id = id if id else None
|
||||
mtype = mtype.decode() if body else None
|
||||
body = umsgpack.unpackb(body) if body else None
|
||||
|
||||
return cls(key=key,id=id,mtype=mtype, body=body)
|
||||
|
||||
return cls(key=key, id=id, mtype=mtype, body=body)
|
||||
|
||||
def dump(self):
|
||||
if self.body is None:
|
||||
@ -85,7 +85,7 @@ class Client():
|
||||
self.task = asyncio.ensure_future(self.main())
|
||||
|
||||
self.property_map = {}
|
||||
|
||||
|
||||
logger.info("{} client initialized".format(id))
|
||||
|
||||
def bind_ports(self):
|
||||
@ -98,7 +98,7 @@ class Client():
|
||||
# request socket: send request/message over all peers throught the server
|
||||
self.req_sock = self.context.socket(zmq.DEALER)
|
||||
self.req_sock.setsockopt(zmq.IDENTITY, self.id)
|
||||
self.req_sock.setsockopt(zmq.SNDHWM , 60)
|
||||
self.req_sock.setsockopt(zmq.SNDHWM, 60)
|
||||
self.req_sock.linger = 0
|
||||
self.req_sock.connect("tcp://localhost:5556")
|
||||
|
||||
@ -107,7 +107,7 @@ class Client():
|
||||
self.push_sock.setsockopt(zmq.IDENTITY, self.id)
|
||||
self.push_sock.linger = 0
|
||||
self.push_sock.connect("tcp://localhost:5557")
|
||||
self.push_sock.setsockopt(zmq.SNDHWM , 60)
|
||||
self.push_sock.setsockopt(zmq.SNDHWM, 60)
|
||||
|
||||
# Sockets aggregator, not really used for now
|
||||
self.poller = zmq.Poller()
|
||||
@ -119,23 +119,21 @@ class Client():
|
||||
# Prepare our context and publisher socket
|
||||
while True:
|
||||
# TODO: find a better way
|
||||
await asyncio.sleep(0.016)
|
||||
try:
|
||||
socks = dict(self.poller.poll(1))
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
socks = dict(self.poller.poll(1))
|
||||
|
||||
if self.pull_sock in socks:
|
||||
rcfmsg = RCFMessage.recv(self.pull_sock)
|
||||
|
||||
rcfmsg.store(self.property_map)
|
||||
# rcfmsg.dump()
|
||||
|
||||
|
||||
for f in self.recv_callback:
|
||||
f(rcfmsg)
|
||||
else:
|
||||
await asyncio.sleep(0.016)
|
||||
|
||||
def push_update(self, key,mtype,body):
|
||||
rcfmsg = RCFMessage(key,self.id,mtype,body)
|
||||
def push_update(self, key, mtype, body):
|
||||
rcfmsg = RCFMessage(key, self.id, mtype, body)
|
||||
rcfmsg.send(self.push_sock)
|
||||
# self.push_sock.send_multipart()
|
||||
|
||||
@ -167,7 +165,7 @@ class Server():
|
||||
def bind_ports(self):
|
||||
# Update all clients
|
||||
self.pub_sock = self.context.socket(zmq.PUB)
|
||||
self.pub_sock.setsockopt(zmq.SNDHWM , 60)
|
||||
self.pub_sock.setsockopt(zmq.SNDHWM, 60)
|
||||
self.pub_sock.bind("tcp://*:5555")
|
||||
time.sleep(0.2)
|
||||
|
||||
@ -190,25 +188,19 @@ class Server():
|
||||
logger.info("{} server launched".format(id))
|
||||
|
||||
while True:
|
||||
# TODO: find a better way
|
||||
await asyncio.sleep(0.0001)
|
||||
|
||||
try:
|
||||
socks = dict(self.poller.poll(1))
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
# TODO: Listener on anoter process linked with PAIR/PAIR ?
|
||||
socks = dict(self.poller.poll(1))
|
||||
|
||||
if self.request_sock in socks:
|
||||
msg = self.request_sock.recv_multipart(zmq.DONTWAIT)
|
||||
|
||||
# Update all clients
|
||||
self.pub_sock.send_multipart(msg)
|
||||
|
||||
if self.collector_sock in socks:
|
||||
elif self.collector_sock in socks:
|
||||
msg = self.collector_sock.recv_multipart(zmq.DONTWAIT)
|
||||
|
||||
# Update all clients
|
||||
self.pub_sock.send_multipart(msg)
|
||||
else:
|
||||
await asyncio.sleep(0.016)
|
||||
|
||||
def stop(self):
|
||||
logger.debug("Stopping server")
|
||||
|
@ -83,7 +83,6 @@ def observer():
|
||||
client.push_update(key,value_type,value)
|
||||
except:
|
||||
pass
|
||||
|
||||
return 0.16
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user