feat(rfc): ground work for property streaming

Added new PUSH/PULL socket for client -> server update push
This commit is contained in:
Swann
2019-02-13 14:04:32 +01:00
parent a77bce9b41
commit 2a7bf7c933

View File

@ -2,15 +2,16 @@ import zmq
import asyncio import asyncio
import logging import logging
from .libs import umsgpack from .libs import umsgpack
from .libs import kvsimple
import time import time
import random import random
import strut import struct
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
# TODO: Add message time and author stamp for reliabilty # TODO: Add message time and author stamp for reliabilty
class RCFMessage(object): class RCFMessage(object):
""" """
Message is formatted on wire as 2 frames: Message is formatted on wire as 2 frames:
@ -34,9 +35,9 @@ class RCFMessage(object):
def send(self, socket): def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such.""" """Send key-value message to socket; any empty frames are sent as such."""
key = '' if self.key is None else self.key) key = '' if self.key is None else self.key
body = '' if self.body is None else self.body body = '' if self.body is None else self.body
socket.send_multipart([ key, body ]) socket.send_multipart([key, body])
@classmethod @classmethod
def recv(cls, socket): def recv(cls, socket):
@ -49,24 +50,25 @@ class RCFMessage(object):
def dump(self): def dump(self):
if self.body is None: if self.body is None:
size = 0 size = 0
data='NULL' data = 'NULL'
else: else:
size = len(self.body) size = len(self.body)
data=repr(self.body) data = repr(self.body)
print("[key:{key}][size:{size}] {data}".format( print("[key:{key}][size:{size}] {data}".format(
key=self.key, key=self.key,
size=size, size=size,
data=data, data=data,
)) ))
class Client(): class Client():
def __init__(self, context=zmq.Context(), id="default", recv_callback=None): def __init__(self, context=zmq.Context(), id="default", recv_callback=None):
self.context = context self.context = context
self.pull_sock = None self.pull_sock = None
self.push_sock = None self.req_sock = None
self.poller = None self.poller = None
self.id = id self.id = id.encode()
self.recv_callback = recv_callback self.recv_callback = recv_callback
self.bind_ports() self.bind_ports()
# Main client loop registration # Main client loop registration
@ -83,11 +85,17 @@ class Client():
self.pull_sock.connect("tcp://localhost:5555") self.pull_sock.connect("tcp://localhost:5555")
self.pull_sock.setsockopt_string(zmq.SUBSCRIBE, '') self.pull_sock.setsockopt_string(zmq.SUBSCRIBE, '')
# push socket: push update TO server # request socket: send request/message over all peers throught the server
self.push_sock = self.context.socket(zmq.DEALER) self.req_sock = self.context.socket(zmq.DEALER)
self.push_sock.setsockopt(zmq.IDENTITY, self.id.encode()) self.req_sock.setsockopt(zmq.IDENTITY, self.id)
self.req_sock.linger = 0
self.req_sock.connect("tcp://localhost:5556")
# push update socket
self.push_sock = self.context.socket(zmq.PUSH)
self.push_sock.setsockopt(zmq.IDENTITY, self.id)
self.push_sock.linger = 0 self.push_sock.linger = 0
self.push_sock.connect("tcp://localhost:5556") self.push_sock.connect("tcp://localhost:5557")
# Sockets aggregator, not really used for now # Sockets aggregator, not really used for now
self.poller = zmq.Poller() self.poller = zmq.Poller()
@ -106,22 +114,32 @@ class Client():
break break
if self.pull_sock in socks: if self.pull_sock in socks:
message = self.pull_sock.recv_multipart(zmq.NOBLOCK) update = self.pull_sock.recv_multipart(zmq.NOBLOCK)
logger.info("{}:{}".format(message[0].decode(
'ascii'), umsgpack.unpackb(message[1]))) print(update)
# TODO: Proper routing throught different socket / sub ?
if update[1].decode() == 'chat':
# Store message # Store message
self.message_store.append( self.message_store.append(
[message[0].decode('ascii'), umsgpack.unpackb(message[1])]) [update[0].decode('ascii'), umsgpack.unpackb(update[2])])
if update[1].decode() == 'chat':
pass
if self.recv_callback: if self.recv_callback:
self.recv_callback() self.recv_callback()
def send_msg(self, msg): def send_msg(self, msg):
self.push_sock.send(umsgpack.packb(msg)) self.req_sock.send_multipart([b"chat",umsgpack.packb(msg)])
def send_update(self, msg):
self.push_sock.send_multipart()
def stop(self): def stop(self):
logger.info("Stopping client") logger.info("Stopping client")
self.poller.unregister(self.pull_sock) self.poller.unregister(self.pull_sock)
self.req_sock.close()
self.push_sock.close() self.push_sock.close()
self.pull_sock.close() self.pull_sock.close()
self.task.cancel() self.task.cancel()
@ -130,8 +148,10 @@ class Client():
class Server(): class Server():
def __init__(self, context=zmq.Context(), id="admin"): def __init__(self, context=zmq.Context(), id="admin"):
self.context = context self.context = context
self.pub_sock = None self.pub_sock = None
self.pull_sock = None self.request_sock = None
self.collector_sock = None
self.poller = None self.poller = None
self.id = id self.id = id
@ -147,17 +167,22 @@ class Server():
self.pub_sock.bind("tcp://*:5555") self.pub_sock.bind("tcp://*:5555")
time.sleep(0.2) time.sleep(0.2)
# Update receiver # Update request
self.pull_sock = self.context.socket(zmq.ROUTER) self.request_sock = self.context.socket(zmq.ROUTER)
self.pull_sock.bind("tcp://*:5556") self.request_sock.bind("tcp://*:5556")
# Update collector
self.collector_sock = self.context.socket(zmq.PULL)
self.collector_sock.bind("tcp://*:5557")
# poller for socket aggregation # poller for socket aggregation
self.poller = zmq.Poller() self.poller = zmq.Poller()
self.poller.register(self.pull_sock, zmq.POLLIN) self.poller.register(self.request_sock, zmq.POLLIN)
self.poller.register(self.collector_sock, zmq.POLLIN)
async def main(self): async def main(self):
logger.info("{} server launched".format(id)) logger.info("{} server launched".format(id))
# Prepare our context and publisher socket
while True: while True:
# TODO: find a better way # TODO: find a better way
await asyncio.sleep(0.016) await asyncio.sleep(0.016)
@ -167,17 +192,19 @@ class Server():
except KeyboardInterrupt: except KeyboardInterrupt:
break break
if self.pull_sock in socks: if self.request_sock in socks:
msg = self.pull_sock.recv_multipart(zmq.NOBLOCK) msg = self.request_sock.recv_multipart(zmq.NOBLOCK)
#print("{}:{}".format(msg[0].decode('ascii'), umsgpack.packb(msg[1])))
# Update all clients # Update all clients
self.pub_sock.send_multipart(msg) self.pub_sock.send_multipart(msg)
if self.collector_sock in socks:
pass
def stop(self): def stop(self):
logger.info("Stopping server") logger.info("Stopping server")
self.poller.unregister(self.pull_sock) self.poller.unregister(self.request_sock)
self.pub_sock.close() self.pub_sock.close()
self.pull_sock.close() self.request_sock.close()
self.collector_sock.close()
self.task.cancel() self.task.cancel()