From 2a7bf7c933b89fe6bd5ebc66e537ec9cfd176b1f Mon Sep 17 00:00:00 2001 From: Swann Date: Wed, 13 Feb 2019 14:04:32 +0100 Subject: [PATCH] feat(rfc): ground work for property streaming Added new PUSH/PULL socket for client -> server update push --- net_components.py | 95 ++++++++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/net_components.py b/net_components.py index f672621..ca24c34 100644 --- a/net_components.py +++ b/net_components.py @@ -2,15 +2,16 @@ import zmq import asyncio import logging from .libs import umsgpack -from .libs import kvsimple import time import random -import strut +import struct logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) # TODO: Add message time and author stamp for reliabilty + + class RCFMessage(object): """ Message is formatted on wire as 2 frames: @@ -18,8 +19,8 @@ class RCFMessage(object): frame 2: body (blob) // Could be any data """ - key = None # key (string) - body = None # blob + key = None # key (string) + body = None # blob def __init__(self, key=None, body=None): self.key = key @@ -28,15 +29,15 @@ class RCFMessage(object): def store(self, dikt): """Store me in a dict if I have anything to store""" # this seems weird to check, but it's what the C example does - # this currently erasing old value + # this currently erasing old value if self.key is not None and self.body is not None: dikt[self.key] = self def send(self, socket): """Send key-value message to socket; any empty frames are sent as such.""" - key = '' if self.key is None else self.key) + key = '' if self.key is None else self.key body = '' if self.body is None else self.body - socket.send_multipart([ key, body ]) + socket.send_multipart([key, body]) @classmethod def recv(cls, socket): @@ -49,24 +50,25 @@ class RCFMessage(object): def dump(self): if self.body is None: size = 0 - data='NULL' + data = 'NULL' else: size = len(self.body) - data=repr(self.body) + data = repr(self.body) print("[key:{key}][size:{size}] {data}".format( key=self.key, size=size, data=data, )) + class Client(): def __init__(self, context=zmq.Context(), id="default", recv_callback=None): self.context = context self.pull_sock = None - self.push_sock = None + self.req_sock = None self.poller = None - self.id = id + self.id = id.encode() self.recv_callback = recv_callback self.bind_ports() # Main client loop registration @@ -83,11 +85,17 @@ class Client(): self.pull_sock.connect("tcp://localhost:5555") self.pull_sock.setsockopt_string(zmq.SUBSCRIBE, '') - # push socket: push update TO server - self.push_sock = self.context.socket(zmq.DEALER) - self.push_sock.setsockopt(zmq.IDENTITY, self.id.encode()) + # request socket: send request/message over all peers throught the server + self.req_sock = self.context.socket(zmq.DEALER) + self.req_sock.setsockopt(zmq.IDENTITY, self.id) + self.req_sock.linger = 0 + self.req_sock.connect("tcp://localhost:5556") + + # push update socket + self.push_sock = self.context.socket(zmq.PUSH) + self.push_sock.setsockopt(zmq.IDENTITY, self.id) self.push_sock.linger = 0 - self.push_sock.connect("tcp://localhost:5556") + self.push_sock.connect("tcp://localhost:5557") # Sockets aggregator, not really used for now self.poller = zmq.Poller() @@ -106,22 +114,32 @@ class Client(): break if self.pull_sock in socks: - message = self.pull_sock.recv_multipart(zmq.NOBLOCK) - logger.info("{}:{}".format(message[0].decode( - 'ascii'), umsgpack.unpackb(message[1]))) - # Store message - self.message_store.append( - [message[0].decode('ascii'), umsgpack.unpackb(message[1])]) + update = self.pull_sock.recv_multipart(zmq.NOBLOCK) + + print(update) + + # TODO: Proper routing throught different socket / sub ? + if update[1].decode() == 'chat': + # Store message + self.message_store.append( + [update[0].decode('ascii'), umsgpack.unpackb(update[2])]) + if update[1].decode() == 'chat': + pass if self.recv_callback: self.recv_callback() def send_msg(self, msg): - self.push_sock.send(umsgpack.packb(msg)) + self.req_sock.send_multipart([b"chat",umsgpack.packb(msg)]) + + def send_update(self, msg): + + self.push_sock.send_multipart() def stop(self): logger.info("Stopping client") self.poller.unregister(self.pull_sock) + self.req_sock.close() self.push_sock.close() self.pull_sock.close() self.task.cancel() @@ -130,8 +148,10 @@ class Client(): class Server(): def __init__(self, context=zmq.Context(), id="admin"): self.context = context + self.pub_sock = None - self.pull_sock = None + self.request_sock = None + self.collector_sock = None self.poller = None self.id = id @@ -147,17 +167,22 @@ class Server(): self.pub_sock.bind("tcp://*:5555") time.sleep(0.2) - # Update receiver - self.pull_sock = self.context.socket(zmq.ROUTER) - self.pull_sock.bind("tcp://*:5556") + # Update request + self.request_sock = self.context.socket(zmq.ROUTER) + self.request_sock.bind("tcp://*:5556") + + # Update collector + self.collector_sock = self.context.socket(zmq.PULL) + self.collector_sock.bind("tcp://*:5557") # poller for socket aggregation self.poller = zmq.Poller() - self.poller.register(self.pull_sock, zmq.POLLIN) + self.poller.register(self.request_sock, zmq.POLLIN) + self.poller.register(self.collector_sock, zmq.POLLIN) async def main(self): logger.info("{} server launched".format(id)) - # Prepare our context and publisher socket + while True: # TODO: find a better way await asyncio.sleep(0.016) @@ -167,17 +192,19 @@ class Server(): except KeyboardInterrupt: break - if self.pull_sock in socks: - msg = self.pull_sock.recv_multipart(zmq.NOBLOCK) - #print("{}:{}".format(msg[0].decode('ascii'), umsgpack.packb(msg[1]))) + if self.request_sock in socks: + msg = self.request_sock.recv_multipart(zmq.NOBLOCK) # Update all clients self.pub_sock.send_multipart(msg) + if self.collector_sock in socks: + pass + def stop(self): logger.info("Stopping server") - self.poller.unregister(self.pull_sock) + self.poller.unregister(self.request_sock) self.pub_sock.close() - self.pull_sock.close() - + self.request_sock.close() + self.collector_sock.close() self.task.cancel()