diff --git a/net_components.py b/net_components.py index 7233b24..d091733 100644 --- a/net_components.py +++ b/net_components.py @@ -1,11 +1,17 @@ import collections import logging +import threading from uuid import uuid4 +import binascii +import os +from random import randint import time from enum import Enum -from .libs import umsgpack, zmq - +try: + from .libs import umsgpack, zmq +except: + from libs import umsgpack, zmq logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) @@ -13,6 +19,7 @@ CONNECT_TIMEOUT = 2 WAITING_TIME = 0.001 SERVER_MAX = 1 + def zpipe(ctx): """build inproc pipe for talking to threads @@ -27,7 +34,7 @@ def zpipe(ctx): iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) a.bind(iface) b.connect(iface) - return a,b + return a, b class State(Enum): @@ -36,52 +43,10 @@ class State(Enum): ACTIVE = 3 -class RCFFactory(object): - """ - Abstract layer used to bridge external and inter - """ - - def init(self, data): - """ - set the RCFMessage pointer to local data - """ - print("Default setter") - # Setup data accessor - data.get = self.load_getter(data) - data.set = self.load_setter(data) - - # TODO: Setup local pointer - - def load_getter(self, data): - """ - local program > rcf - - """ - print("Default getter") - return None - - def load_setter(self, data): - """ - rcf > local program - """ - print("Default setter") - return None - - def apply(self, data): - pass - - def diff(self, data): - """ - Verify data integrity - """ - pass - - class RCFStore(collections.MutableMapping, dict): - def __init__(self, custom_factory=RCFFactory()): + def __init__(self): super().__init__() - self.factory = custom_factory def __getitem__(self, key): return dict.__getitem__(self, key) @@ -117,22 +82,20 @@ class RCFMessage(object): body = None # data blob uuid = None - def __init__(self, key=None,uuid= None, id=None, mtype=None, body=None): + def __init__(self, key=None, uuid=None, id=None, mtype=None, body=None): if uuid is None: - uuid = uuid4() - + uuid = uuid4().bytes + self.key = key self.uuid = uuid self.mtype = mtype self.body = body self.id = id - - def store(self, dikt): """Store me in a dict if I have anything to store""" # this currently erasing old value - if self.key is not None : + if self.key is not None: dikt[self.key] = self # elif self.key in dikt: # del dikt[self.key] @@ -140,26 +103,25 @@ class RCFMessage(object): def send(self, socket): """Send key-value message to socket; any empty frames are sent as such.""" key = ''.encode() if self.key is None else self.key.encode() - print(self.mtype) mtype = ''.encode() if self.mtype is None else self.mtype.encode() body = ''.encode() if self.body is None else umsgpack.packb(self.body) id = ''.encode() if self.id is None else self.id try: - socket.send_multipart([key,self.uuid, id, mtype, body]) + socket.send_multipart([key, id, mtype, body]) except: - logger.info("Fail to send {}".format(key)) + logger.info("Fail to send {} {}".format(key,id)) @classmethod def recv(cls, socket): """Reads key-value message from socket, returns new kvmsg instance.""" - key,uuid, id, mtype, body = socket.recv_multipart(zmq.DONTWAIT) + key, id, mtype, body = socket.recv_multipart(zmq.DONTWAIT) key = key.decode() if key else None id = id if id else None mtype = mtype.decode() if body else None body = umsgpack.unpackb(body) if body else None - return cls(key=key,uuid=uuid, id=id, mtype=mtype, body=body) + return cls(key=key, id=id, mtype=mtype, body=body) def dump(self): if self.body is None: @@ -176,84 +138,29 @@ class RCFMessage(object): )) +class RCFClient(object): + ctx = None + pipe = None + agent = None -class RCFClient(): - def __init__( - self, - context=zmq.Context(), - id="default", - on_recv=None, - on_post_init=None, - is_admin=False, - factory=None, - address="localhost"): + def __init__(self): + self.ctx = zmq.Context() + self.pipe, peer = zpipe(self.ctx) + self.agent = threading.Thread( + target=rcf_client_agent, args=(self.ctx, peer)) + self.agent.daemon = True + self.agent.start() - # 0MQ vars - self.context = context - self.pull_sock = None - self.req_sock = None - self.poller = None + def connect(self, address, port): + self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance( + address, str) else address), b'%d' % port]) - # Client configuration - self.id = id.encode() - self.on_recv = on_recv - self.on_post_init = on_post_init - self.status = RCFStatus.IDLE - self.is_admin = is_admin - self.address = address + def set(self, key, value): + """Set new value in distributed hash table + Sends [SET][key][value][ttl] to the agent + """ + self.pipe.send_multipart([b"SET", umsgpack.packb(key), umsgpack.packb(value)]) - self.bind_ports() - - # client routine registration - self.load_task = asyncio.ensure_future(self.load()) - self.tick_task = None - - - - logger.info("{} client initialized".format(id)) - - def bind_ports(self): - # pull socket: get update FROM server - self.pull_sock = self.context.socket(zmq.SUB) - self.pull_sock.linger = 0 - self.pull_sock.connect("tcp://{}:5555".format(self.address)) - self.pull_sock.setsockopt_string(zmq.SUBSCRIBE, '') - - # request socket: send request/message over all peers throught the server - self.req_sock = self.context.socket(zmq.DEALER) - self.req_sock.setsockopt(zmq.IDENTITY, self.id) - # self.req_sock.setsockopt(zmq.SNDHWM, 60) - self.req_sock.linger = 0 - self.req_sock.connect("tcp://{}:5556".format(self.address)) - - # push update socket - self.push_sock = self.context.socket(zmq.PUSH) - self.push_sock.setsockopt(zmq.IDENTITY, self.id) - self.push_sock.linger = 0 - self.push_sock.connect("tcp://{}:5557".format(self.address)) - self.push_sock.setsockopt(zmq.SNDHWM, 60) - - # Sockets aggregator, not really used for now - self.poller = zmq.Poller() - self.poller.register(self.pull_sock, zmq.POLLIN) - - time.sleep(0.1) - - - def push_update(self, key, mtype, body): - rcfmsg = RCFMessage(key=key, id=self.id,mtype=mtype, body=body) - rcfmsg.send(self.push_sock) - - def stop(self): - logger.debug("Stopping client") - self.poller.unregister(self.pull_sock) - self.req_sock.close() - self.push_sock.close() - self.pull_sock.close() - self.load_task.cancel() - - if self.tick_task: - self.tick_task.cancel() class RCFServer(object): address = None # Server address @@ -261,19 +168,22 @@ class RCFServer(object): snapshot = None # Snapshot socket subscriber = None # Incoming updates - def __init__(self, ctx, address, port): + def __init__(self, ctx, address, port,id): self.address = address self.port = port self.snapshot = ctx.socket(zmq.DEALER) self.snapshot.linger = 0 - self.snapshot.connect("%s:%i".format(address.decode(),port)) + self.snapshot.connect("tcp://{}:{}".format(address.decode(), port)) + self.snapshot.setsockopt(zmq.IDENTITY, id) self.subscriber = ctx.socket(zmq.SUB) self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '') - self.subscriber.connect("%s:%i".format(address.decode(),port+1)) + self.subscriber.connect("tcp://{}:{}".format(address.decode(), port+1)) self.subscriber.linger = 0 + print("connected on tcp://{}:{}".format(address.decode(), port)) + class RCFClientAgent(object): - ctx = None + ctx = None pipe = None property_map = None publisher = None @@ -281,21 +191,19 @@ class RCFClientAgent(object): state = State.INITIAL server = None - def __init__(self, ctx, pipe, id): - self.ctx = None - self.pipe = None - self.property_map = None - self.publisher = None - self.id = None + def __init__(self, ctx, pipe): + self.ctx = ctx + self.pipe = pipe + self.property_map = RCFStore() + self.id = b"test" self.state = State.INITIAL self.server = None - self.publisher = self.context.socket(zmq.PUSH) # push update socket + self.publisher = self.ctx.socket(zmq.PUSH) # push update socket self.publisher.setsockopt(zmq.IDENTITY, self.id) self.publisher.setsockopt(zmq.SNDHWM, 60) self.publisher.linger = 0 - - def control_message (self): + def control_message(self): msg = self.pipe.recv_multipart() command = msg.pop(0) @@ -303,32 +211,43 @@ class RCFClientAgent(object): address = msg.pop(0) port = int(msg.pop(0)) - if len(self.servers) < SERVER_MAX: - self.server = RCFServer(self.ctx, address, port) - self.publisher.connect("tcp://{}:5557".format(address.decode())) - + if self.server is None: + self.server = RCFServer(self.ctx, address, port, self.id) + self.publisher.connect("tcp://{}:{}".format(address.decode(), port+2)) + else: logger.error("E: too many servers (max. %i)", SERVER_MAX) + + elif command == b"SET": + key,value = msg + # Send key-value pair on to server + rcfmsg = RCFMessage(key=umsgpack.unpackb(key),id=self.id ,mtype="",body=umsgpack.unpackb(value)) + rcfmsg.store(self.property_map) + + rcfmsg.send(self.publisher) -def rcf_client_agent(ctx,pipe,id): - agent = RCFClientAgent(ctx,pipe,id) +def rcf_client_agent(ctx, pipe): + agent = RCFClientAgent(ctx, pipe) server = None - + while True: + # logger.info("asdasd") + poller = zmq.Poller() poller.register(agent.pipe, zmq.POLLIN) server_socket = None if agent.state == State.INITIAL: server = agent.server - if agent.servers: - logger.info ("I: waiting for server at %s:%d...", - server.address, server.port) + if agent.server: + logger.info("I: waiting for server at %s:%d...", + server.address, server.port) server.snapshot.send(b"SNAPSHOT_REQUEST") agent.state = State.SYNCING + server_socket = server.snapshot elif agent.state == State.SYNCING: - sever_socket = server.snapshot + server_socket = server.snapshot elif agent.state == State.ACTIVE: server_socket = server.subscriber @@ -338,14 +257,12 @@ def rcf_client_agent(ctx,pipe,id): try: items = dict(poller.poll()) except: - raise - break + pass if agent.pipe in items: agent.control_message() elif server_socket in items: rcfmsg = RCFMessage.recv(server_socket) - if agent.state == State.SYNCING: # Store snapshot if rcfmsg.key == "SNAPSHOT_END": @@ -356,16 +273,15 @@ def rcf_client_agent(ctx,pipe,id): elif agent.state == State.ACTIVE: if rcfmsg.id != agent.id: rcfmsg.store(agent.property_map) - action = "update" if kvmsg.body else "delete" - logging.info ("I: received from %s:%d %s", - server.address, server.port, action) - else: - agent.state = State.INITIAL - - + action = "update" if rcfmsg.body else "delete" + logging.info("I: received from {}:{},{} {}".format(server.address,rcfmsg.body.id, server.port, action)) + else: + logger.info("IDLE") + # else: else + # agent.state = State.INITIAL class RCFServerAgent(): - def __init__(self, context=zmq.Context(), id="admin"): + def __init__(self, context=zmq.Context.instance(), id="admin"): self.context = context self.pub_sock = None @@ -373,11 +289,11 @@ class RCFServerAgent(): self.collector_sock = None self.poller = None - self.property_map = RCFStore() + self.property_map = {} self.id = id self.bind_ports() # Main client loop registration - tick() + self.tick() logger.info("{} client initialized".format(id)) @@ -385,14 +301,14 @@ class RCFServerAgent(): # Update all clients self.pub_sock = self.context.socket(zmq.PUB) self.pub_sock.setsockopt(zmq.SNDHWM, 60) - self.pub_sock.bind("tcp://*:5555") + self.pub_sock.bind("tcp://*:5556") time.sleep(0.2) # Update request self.request_sock = self.context.socket(zmq.ROUTER) self.request_sock.setsockopt(zmq.IDENTITY, b'SERVER') self.request_sock.setsockopt(zmq.RCVHWM, 60) - self.request_sock.bind("tcp://*:5556") + self.request_sock.bind("tcp://*:5555") # Update collector self.collector_sock = self.context.socket(zmq.PULL) @@ -409,7 +325,7 @@ class RCFServerAgent(): while True: # Non blocking poller - socks = dict(self.poller.poll()) + socks = dict(self.poller.poll(1000)) # Snapshot system for late join (Server - Client) if self.request_sock in socks: @@ -417,7 +333,7 @@ class RCFServerAgent(): identity = msg[0] request = msg[1] - + print("asdasd") if request == b"SNAPSHOT_REQUEST": pass else: @@ -437,16 +353,8 @@ class RCFServerAgent(): # Regular update routing (Clients / Client) elif self.collector_sock in socks: - msg = RCFMessage.recv(self.collector_sock) + msg = RCFMessage.recv(self.collector_sock) # Update all clients msg.store(self.property_map) msg.send(self.pub_sock) - - def stop(self): - logger.debug("Stopping server") - self.poller.unregister(self.request_sock) - self.pub_sock.close() - self.request_sock.close() - self.collector_sock.close() - - self.status = RCFStatus.IDLE + diff --git a/net_operators.py b/net_operators.py index 5727ed9..e0a60ae 100644 --- a/net_operators.py +++ b/net_operators.py @@ -672,8 +672,8 @@ class session_join(bpy.types.Operator): net_settings = context.scene.session_settings # Scene setup - if net_settings.session_mode == "CONNECT" and net_settings.clear_scene: - clean_scene() + # if net_settings.session_mode == "CONNECT" and net_settings.clear_scene: + # clean_scene() # Session setup if net_settings.username == "DefaultUser": @@ -682,23 +682,21 @@ class session_join(bpy.types.Operator): username = str(context.scene.session_settings.username) - client = net_components.RCFClient( - id=username, - on_recv=recv_callbacks, - on_post_init=post_init_callbacks, - address=net_settings.ip, - is_admin=net_settings.session_mode == "HOST") - bpy.ops.asyncio.loop() + client = net_components.RCFClient() + client.connect("127.0.0.1",5555) + client.set('key', 1) - net_settings.is_running = True - drawer = net_draw.HUD(client_instance=client) + # net_settings.is_running = True - register_ticks() + # drawer = net_draw.HUD(client_instance=client) + + # register_ticks() return {"FINISHED"} + class session_add_property(bpy.types.Operator): bl_idname = "session.add_prop" bl_label = "add" @@ -715,21 +713,22 @@ class session_add_property(bpy.types.Operator): def execute(self, context): global client - item = resolve_bpy_path(self.property_path) + client.set('key', 1) + # item = resolve_bpy_path(self.property_path) - print(item) + # print(item) - if item: - key = self.property_path + # if item: + # key = self.property_path - dumper = dump_anything.Dumper() - dumper.type_subset = dumper.match_subset_all - dumper.depth = self.depth + # dumper = dump_anything.Dumper() + # dumper.type_subset = dumper.match_subset_all + # dumper.depth = self.depth - data = dumper.dump(item) - data_type = item.__class__.__name__ + # data = dumper.dump(item) + # data_type = item.__class__.__name__ - client.push_update(key, data_type, data) + # client.push_update(key, data_type, data) return {"FINISHED"} @@ -771,7 +770,7 @@ class session_create(bpy.types.Operator): global server global client - server = net_components.RCFServer() + server = net_components.RCFServerAgent() time.sleep(0.1) bpy.ops.session.join() @@ -983,11 +982,11 @@ def unregister(): pass if server: - server.stop() + # server.stop() del server server = None if client: - client.stop() + # client.stop() del client client = None diff --git a/net_ui.py b/net_ui.py index 2add02a..6beb38c 100644 --- a/net_ui.py +++ b/net_ui.py @@ -55,17 +55,17 @@ class SessionSettingsPanel(bpy.types.Panel): row = layout.row() row.operator("session.join", text="CONNECT") - else: + # else: - if net_operators.client.status is net_components.RCFStatus.CONNECTED: - row.label(text="Net frequency:") - row.prop(net_settings, "update_frequency", text="") - row = layout.row() - row.operator("session.stop", icon='QUIT', text="Exit") - elif net_operators.client.status is net_components.RCFStatus.CONNECTING: - row.label(text="connecting...") - row = layout.row() - row.operator("session.stop", icon='QUIT', text="CANCEL") + # if net_operators.client.status is net_components.RCFStatus.CONNECTED: + # row.label(text="Net frequency:") + # row.prop(net_settings, "update_frequency", text="") + # row = layout.row() + # row.operator("session.stop", icon='QUIT', text="Exit") + # elif net_operators.client.status is net_components.RCFStatus.CONNECTING: + # row.label(text="connecting...") + # row = layout.row() + # row.operator("session.stop", icon='QUIT', text="CANCEL") row = layout.row() @@ -198,9 +198,9 @@ class SessionTaskPanel(bpy.types.Panel): classes = ( SessionSettingsPanel, - SessionUsersPanel, - SessionPropertiesPanel, - SessionTaskPanel, + # SessionUsersPanel, + # SessionPropertiesPanel, + # SessionTaskPanel, ) diff --git a/rcf_server.py b/rcf_server.py new file mode 100644 index 0000000..148e61d --- /dev/null +++ b/rcf_server.py @@ -0,0 +1,20 @@ +import collections +import logging +import threading +from uuid import uuid4 +import binascii +import os +from random import randint +import time +from enum import Enum + +from libs import umsgpack, zmq +from net_components import RCFMessage +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +CONNECT_TIMEOUT = 2 +WAITING_TIME = 0.001 +SERVER_MAX = 1 + + diff --git a/test_client.py b/test_client.py new file mode 100644 index 0000000..b9ab3d1 --- /dev/null +++ b/test_client.py @@ -0,0 +1,14 @@ +from net_components import RCFClient +import time +client = RCFClient() + +client.connect("127.0.0.1",5555) + + +try: + while True: + client.set('key', 1) + # Distribute as key-value message + time.sleep(1) +except KeyboardInterrupt: + pass diff --git a/test_server.py b/test_server.py index 196dfbd..0aaee34 100644 --- a/test_server.py +++ b/test_server.py @@ -1,3 +1,4 @@ from net_components import RCFServerAgent -server = RCFServerAgent() \ No newline at end of file +server = RCFServerAgent() +