diff --git a/net_components.py b/net_components.py index 3ce8c90..7233b24 100644 --- a/net_components.py +++ b/net_components.py @@ -1,4 +1,3 @@ -import asyncio import collections import logging from uuid import uuid4 @@ -12,12 +11,29 @@ logging.basicConfig(level=logging.DEBUG) CONNECT_TIMEOUT = 2 WAITING_TIME = 0.001 +SERVER_MAX = 1 + +def zpipe(ctx): + """build inproc pipe for talking to threads + + mimic pipe used in czmq zthread_fork. + + Returns a pair of PAIRs connected via inproc + """ + a = ctx.socket(zmq.PAIR) + b = ctx.socket(zmq.PAIR) + a.linger = b.linger = 0 + a.hwm = b.hwm = 1 + iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) + a.bind(iface) + b.connect(iface) + return a,b -class RCFStatus(Enum): - IDLE = 1 - CONNECTING = 2 - CONNECTED = 3 +class State(Enum): + INITIAL = 1 + SYNCING = 2 + ACTIVE = 3 class RCFFactory(object): @@ -160,6 +176,7 @@ class RCFMessage(object): )) + class RCFClient(): def __init__( self, @@ -191,7 +208,7 @@ class RCFClient(): self.load_task = asyncio.ensure_future(self.load()) self.tick_task = None - self.property_map = RCFStore(custom_factory=factory) + logger.info("{} client initialized".format(id)) @@ -222,60 +239,6 @@ class RCFClient(): time.sleep(0.1) - async def load(self): - self.status = RCFStatus.CONNECTING - logger.info("{} client syncing".format(id)) - - # Late join mecanism - logger.info("{} send snapshot request".format(id)) - self.req_sock.send(b"SNAPSHOT_REQUEST") - - while True: - try: - rcfmsg_snapshot = RCFMessage.recv(self.req_sock) - - if rcfmsg_snapshot.key == "SNAPSHOT_END": - logger.info("snapshot complete") - break - else: - logger.info("received : {}".format(rcfmsg_snapshot.key)) - - rcfmsg_snapshot.store(self.property_map) - for f in self.on_recv: - f(rcfmsg_snapshot) - except: - await asyncio.sleep(0.001) - - for f in self.on_post_init: - f() - - logger.info("{} client running".format(id)) - - self.push_update( - "net/clients/{}".format(self.id.decode()), "client", None) - self.push_update( - "net/objects/{}".format(self.id.decode()), "clientObject", None) - - self.tick_task = asyncio.ensure_future(self.tick()) - - self.status = RCFStatus.CONNECTED - - async def tick(self): - # Main loop - while True: - # TODO: find a better way - socks = dict(self.poller.poll(1)) - - if self.pull_sock in socks: - rcfmsg = RCFMessage.recv(self.pull_sock) - - if rcfmsg.id != self.id: - rcfmsg.store(self.property_map) - - for f in self.on_recv: - f(rcfmsg) - else: - await asyncio.sleep(0.0001) def push_update(self, key, mtype, body): rcfmsg = RCFMessage(key=key, id=self.id,mtype=mtype, body=body) @@ -292,8 +255,116 @@ class RCFClient(): if self.tick_task: self.tick_task.cancel() +class RCFServer(object): + address = None # Server address + port = None # Server port + snapshot = None # Snapshot socket + subscriber = None # Incoming updates -class RCFServer(): + def __init__(self, ctx, address, port): + self.address = address + self.port = port + self.snapshot = ctx.socket(zmq.DEALER) + self.snapshot.linger = 0 + self.snapshot.connect("%s:%i".format(address.decode(),port)) + self.subscriber = ctx.socket(zmq.SUB) + self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '') + self.subscriber.connect("%s:%i".format(address.decode(),port+1)) + self.subscriber.linger = 0 + +class RCFClientAgent(object): + ctx = None + pipe = None + property_map = None + publisher = None + id = None + state = State.INITIAL + server = None + + def __init__(self, ctx, pipe, id): + self.ctx = None + self.pipe = None + self.property_map = None + self.publisher = None + self.id = None + self.state = State.INITIAL + self.server = None + self.publisher = self.context.socket(zmq.PUSH) # push update socket + self.publisher.setsockopt(zmq.IDENTITY, self.id) + self.publisher.setsockopt(zmq.SNDHWM, 60) + self.publisher.linger = 0 + + + def control_message (self): + msg = self.pipe.recv_multipart() + command = msg.pop(0) + + if command == b"CONNECT": + address = msg.pop(0) + port = int(msg.pop(0)) + + if len(self.servers) < SERVER_MAX: + self.server = RCFServer(self.ctx, address, port) + self.publisher.connect("tcp://{}:5557".format(address.decode())) + + else: + logger.error("E: too many servers (max. %i)", SERVER_MAX) + + +def rcf_client_agent(ctx,pipe,id): + agent = RCFClientAgent(ctx,pipe,id) + server = None + + while True: + poller = zmq.Poller() + poller.register(agent.pipe, zmq.POLLIN) + server_socket = None + + if agent.state == State.INITIAL: + server = agent.server + if agent.servers: + logger.info ("I: waiting for server at %s:%d...", + server.address, server.port) + server.snapshot.send(b"SNAPSHOT_REQUEST") + agent.state = State.SYNCING + elif agent.state == State.SYNCING: + sever_socket = server.snapshot + elif agent.state == State.ACTIVE: + server_socket = server.subscriber + + if server_socket: + poller.register(server_socket, zmq.POLLIN) + + try: + items = dict(poller.poll()) + except: + raise + break + + if agent.pipe in items: + agent.control_message() + elif server_socket in items: + rcfmsg = RCFMessage.recv(server_socket) + + if agent.state == State.SYNCING: + # Store snapshot + if rcfmsg.key == "SNAPSHOT_END": + logger.info("snapshot complete") + agent.state = State.ACTIVE + else: + rcfmsg.store(agent.property_map) + elif agent.state == State.ACTIVE: + if rcfmsg.id != agent.id: + rcfmsg.store(agent.property_map) + action = "update" if kvmsg.body else "delete" + logging.info ("I: received from %s:%d %s", + server.address, server.port, action) + else: + agent.state = State.INITIAL + + + +class RCFServerAgent(): def __init__(self, context=zmq.Context(), id="admin"): self.context = context @@ -306,7 +377,7 @@ class RCFServer(): self.id = id self.bind_ports() # Main client loop registration - self.task = asyncio.ensure_future(self.tick()) + tick() logger.info("{} client initialized".format(id)) @@ -333,12 +404,12 @@ class RCFServer(): self.poller.register(self.request_sock, zmq.POLLIN) self.poller.register(self.collector_sock, zmq.POLLIN) - async def tick(self): + def tick(self): logger.info("{} server launched".format(id)) while True: # Non blocking poller - socks = dict(self.poller.poll(1)) + socks = dict(self.poller.poll()) # Snapshot system for late join (Server - Client) if self.request_sock in socks: @@ -370,8 +441,6 @@ class RCFServer(): # Update all clients msg.store(self.property_map) msg.send(self.pub_sock) - else: - await asyncio.sleep(WAITING_TIME) def stop(self): logger.debug("Stopping server") @@ -379,6 +448,5 @@ class RCFServer(): self.pub_sock.close() self.request_sock.close() self.collector_sock.close() - self.task.cancel() self.status = RCFStatus.IDLE diff --git a/test_server.py b/test_server.py new file mode 100644 index 0000000..196dfbd --- /dev/null +++ b/test_server.py @@ -0,0 +1,3 @@ +from net_components import RCFServerAgent + +server = RCFServerAgent() \ No newline at end of file