From 51093b1307f53ab6ebc8b0ef9ef4af3fd86cde24 Mon Sep 17 00:00:00 2001 From: Swann Martinez Date: Thu, 18 Jul 2019 16:38:13 +0200 Subject: [PATCH] feat: apply logic feat: state draft --- client.py | 3 +- replication.py | 128 +++++++++++++++++++++++++-------------- replication_client.py | 137 ++++++++++++++++++++++++++++-------------- test_replication.py | 42 +++++++++---- 4 files changed, 208 insertions(+), 102 deletions(-) diff --git a/client.py b/client.py index eeb2410..10093bf 100644 --- a/client.py +++ b/client.py @@ -52,7 +52,6 @@ def zpipe(ctx): class Client(object): - ctx = None pipe = None net_agent = None @@ -205,7 +204,7 @@ class Client(object): # SAVING FUNCTIONS def dump(self, filepath): - with open('dump.json',"w") as fp: + with open('dump.json', "w") as fp: for key, value in self.store.items(): line = json.dumps(value.body) fp.write(line) diff --git a/replication.py b/replication.py index d63adec..6979724 100644 --- a/replication.py +++ b/replication.py @@ -8,41 +8,54 @@ except: from libs import umsgpack import zmq +import pickle +from enum import Enum logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) + +class RepState(Enum): + ADDED = 0 + COMMITED = 1 + STAGED = 2 + + class ReplicatedDataFactory(object): """ - Manage the data types implamentations + Manage the data types implementations """ + def __init__(self): self.supported_types = [] - - def register_type(self,dtype, implementation): + + # Default registered types + self.register_type(str,RepCommand) + + def register_type(self, dtype, implementation): """ Register a new replicated datatype implementation """ self.supported_types.append((dtype, implementation)) - def match_type_by_instance(self,data): + def match_type_by_instance(self, data): """ Find corresponding type to the given datablock """ - for stypes, implementation in self.supported_types: + for stypes, implementation in self.supported_types: if isinstance(data, stypes): return implementation - + print("type not supported for replication") raise NotImplementedError - def match_type_by_name(self,type_name): - for stypes, implementation in self.supported_types: - if type_name == stypes.__name__: + def match_type_by_name(self, type_name): + for stypes, implementation in self.supported_types: + if type_name == implementation.__name__: return implementation - - return None + print("type not supported for replication") + raise NotImplementedError - def construct_from_dcc(self,data): + def construct_from_dcc(self, data): implementation = self.match_type_by_instance(data) return implementation @@ -52,24 +65,26 @@ class ReplicatedDataFactory(object): """ return self.match_type_by_name(type_name) + class ReplicatedDatablock(object): """ Datablock used for replication """ - uuid = None # key (string) - pointer = None # dcc data reference - data = None # data blob (json) - str_type = None # data type name (for deserialization) - deps = None # dependencies references - owner = None + uuid = None # uuid used as key (string) + pointer = None # dcc data ref (DCC type) + buffer = None # data blob (json) + str_type = None # data type name (string) + deps = [None] # dependencies array (string) + owner = None # Data owner (string) + state = None # Data state (RepState) - def __init__(self, owner=None, data=None, uuid=None): + def __init__(self, owner=None, data=None, uuid=None, buffer=None): self.uuid = uuid if uuid else str(uuid4()) assert(owner) self.owner = owner self.pointer = data - self.str_type = type(data).__name__ - + self.buffer = buffer if buffer else None + self.str_type = type(self).__name__ def push(self, socket): """ @@ -83,8 +98,8 @@ class ReplicatedDatablock(object): key = self.uuid.encode() type = self.str_type.encode() - socket.send_multipart([key,owner,type,data]) - + socket.send_multipart([key, owner, type, data]) + @classmethod def pull(cls, socket, factory): """ @@ -92,59 +107,86 @@ class ReplicatedDatablock(object): - read data from the socket - reconstruct an instance """ - uuid, owner,str_type, data = socket.recv_multipart(zmq.NOBLOCK) - + uuid, owner, str_type, data = socket.recv_multipart(zmq.NOBLOCK) str_type = str_type.decode() - owner=owner.decode() - uuid=uuid.decode() + owner = owner.decode() + uuid = uuid.decode() + data = self.deserialize(data) - instance = factory.construct_from_net(str_type)(owner=owner, uuid=uuid) + instance = factory.construct_from_net(str_type)(owner=owner, uuid=uuid, buffer=data) # instance.data = instance.deserialize(data) return instance - - def store(self, dict, persistent=False): + def store(self, dict, persistent=False): """ I want to store my replicated data. Persistent means into the disk If uuid is none we delete the key from the volume """ if self.uuid is not None: - if self.data == 'None': - logger.info("erasing key {}".format(self.uuid)) + if self.buffer == 'None': + logger.debug("erasing key {}".format(self.uuid)) del dict[self.uuid] else: dict[self.uuid] = self - + return self.uuid - def deserialize(self,data): + def deserialize(self, data): """ - I want to apply changes into the DCC - - MUST RETURN AN OBJECT INSTANCE + BUFFER -> JSON """ raise NotImplementedError - - - def serialize(self,data): + + + def serialize(self, data): """ I want to load data from DCC + DCC -> JSON + MUST RETURN A BYTE ARRAY """ raise NotImplementedError + + + def apply(self,data,target): + """ + JSON -> DCC + """ + raise NotImplementedError + + + def resolve(self): + """ + I want to resolve my orphan data to an existing one + = Assing my pointer + + """ + raise NotImplementedError + def dump(self): + return self.deserialize(self.buffer) +class RepCommand(ReplicatedDatablock): + + def serialize(self,data): + return pickle.dumps(data) + + def deserialize(self,data): + return pickle.load(data) + + def apply(self,data,target): + target = data # class RepObject(ReplicatedDatablock): # def deserialize(self): # try: # if self.pointer is None: # pointer = None - + # # Object specific constructor... # if self.data["data"] in bpy.data.meshes.keys(): # pointer = bpy.data.meshes[self.data["data"]] @@ -173,11 +215,9 @@ class ReplicatedDatablock(object): # self.pointer.hide_select = False # else: # self.pointer.hide_select = True - + # except Exception as e: # logger.error("Object {} loading error: {} ".format(self.data["name"], e)) # def deserialize(self): # self.data = dump_datablock(self.pointer, 1) - - diff --git a/replication_client.py b/replication_client.py index 7c1a992..b158771 100644 --- a/replication_client.py +++ b/replication_client.py @@ -2,10 +2,10 @@ import threading import logging import zmq import time -from replication import ReplicatedDatablock +from replication import ReplicatedDatablock, RepCommand logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger(__name__) +logger = logging.getLogger(__name__) STATE_INITIAL = 0 STATE_SYNCING = 1 @@ -14,16 +14,22 @@ STATE_ACTIVE = 2 class Client(object): def __init__(self,factory=None, config=None): - self._rep_store = {} - self._net = ClientNetService(self._rep_store) assert(factory) + + self._rep_store = {} + self._net_client = ClientNetService( + store_reference=self._rep_store, + factory=factory) self._factory = factory def connect(self): - self._net.start() + self._net_client.start() def disconnect(self): - self._net.stop() + self._net_client.stop() + + def state(self): + return self._net_client.state def register(self, object): """ @@ -34,12 +40,13 @@ class Client(object): new_item = self._factory.construct_from_dcc(object)(owner="client", data=object) if new_item: - log.info("Registering {} on {}".format(object,new_item.uuid)) + logger.info("Registering {} on {}".format(object,new_item.uuid)) new_item.store(self._rep_store) - log.info("Pushing changes...") - new_item.push(self._net.publish) + logger.info("Pushing changes...") + new_item.push(self._net_client.publish) return new_item.uuid + else: raise TypeError("Type not supported") @@ -49,21 +56,26 @@ class Client(object): def unregister(self,object): pass - - - + class ClientNetService(threading.Thread): - def __init__(self,store_reference=None): + def __init__(self,store_reference=None, factory=None): + # Threading threading.Thread.__init__(self) self.name = "ClientNetLink" self.daemon = True - self.exit_event = threading.Event() + + self._exit_event = threading.Event() + self._factory = factory + self._store_reference = store_reference + + assert(self._factory) # Networking self.context = zmq.Context.instance() self.snapshot = self.context.socket(zmq.DEALER) + self.snapshot.setsockopt(zmq.IDENTITY, b'SERVER') self.snapshot.connect("tcp://127.0.0.1:5560") self.subscriber = self.context.socket(zmq.SUB) @@ -78,25 +90,55 @@ class ClientNetService(threading.Thread): def run(self): - log.info("Client is listening") + logger.info("Client is online") poller = zmq.Poller() poller.register(self.snapshot, zmq.POLLIN) poller.register(self.subscriber, zmq.POLLIN) poller.register(self.publish, zmq.POLLOUT) - self.state = 1 + while not self._exit_event.is_set(): + """NET OUT + Given the net state we do something: + SYNCING : Ask for snapshots + ACTIVE : Do nothing + """ + if self.state == STATE_INITIAL: + self.snapshot.send(b"SNAPSHOT_REQUEST") + self.state = STATE_SYNCING + - while not self.exit_event.is_set(): - items = dict(poller.poll(10)) + """NET IN + Given the net state we do something: + SYNCING : Ask for snapshots + ACTIVE : Do nothing + """ + items = dict(poller.poll(1)) + + if self.snapshot in items: + if self.state == STATE_SYNCING: + datablock = ReplicatedDatablock.pull(self.snapshot, self._factory) + + if isinstance(datablock, RepCommand): + + + # We receive updates from the server ! + if self.subscriber in items: + if self.state == STATE_ACTIVE: + logger.debug("Receiving changes from server") + datablock = ReplicatedDatablock.pull(self.subscriber, self._factory) + datablock.store(self._store_reference) if not items: - log.error("No request ") + logger.error("No request ") + time.sleep(1) + def setup(self,id="Client"): + pass def stop(self): - self.exit_event.set() + self._exit_event.set() self.snapshot.close() self.subscriber.close() @@ -110,7 +152,6 @@ class Server(): def __init__(self,config=None, factory=None): self._rep_store = {} self._net = ServerNetService(store_reference=self._rep_store, factory=factory) - # self.serve() def serve(self): self._net.start() @@ -128,7 +169,9 @@ class ServerNetService(threading.Thread): threading.Thread.__init__(self) self.name = "ServerNetLink" self.daemon = True - self.exit_event = threading.Event() + self._exit_event = threading.Event() + + # Networking self._rep_store = store_reference self.context = zmq.Context.instance() @@ -160,33 +203,41 @@ class ServerNetService(threading.Thread): self.pull.bind("tcp://*:5562") except zmq.error.ZMQError: - log.error("Address already in use, change net config") + logger.error("Address already in use, change net config") def run(self): - log.info("Server is listening") + logger.debug("Server is online") poller = zmq.Poller() poller.register(self.snapshot, zmq.POLLIN) poller.register(self.pull, zmq.POLLIN) self.state = STATE_ACTIVE - while not self.exit_event.is_set(): + while not self._exit_event.is_set(): # Non blocking poller - socks = dict(poller.poll(1000)) + socks = dict(poller.poll()) # Snapshot system for late join (Server - Client) - # if self.snapshot in socks: - # msg = self.snapshot.recv_multipart(zmq.DONTWAIT) + if self.snapshot in socks: + msg = self.snapshot.recv_multipart(zmq.DONTWAIT) - # identity = msg[0] - # request = msg[1] + identity = msg[0] + request = msg[1] - # if request == b"SNAPSHOT_REQUEST": - # pass - # else: - # logger.info("Bad snapshot request") - # break + if request == b"SNAPSHOT_REQUEST": + pass + else: + logger.info("Bad snapshot request") + break + + for key, item in self._rep_store: + self.snapshot.send(identity, zmq.SNDMORE) + item.push(self.snapshot) + + self.snapshot.send(identity, zmq.SNDMORE) + RepCommand(owner='server',data='SNAPSHOT_END').push(self.snapshot) + # ordered_props = [(SUPPORTED_TYPES.index(k.split('/')[0]),k,v) for k, v in self.property_map.items()] # ordered_props.sort(key=itemgetter(0)) @@ -204,19 +255,17 @@ class ServerNetService(threading.Thread): # Regular update routing (Clients / Client) if self.pull in socks: - log.info("Receiving changes from client") - msg = ReplicatedDatablock.pull(self.pull, self.factory) + logger.debug("Receiving changes from client") + datablock = ReplicatedDatablock.pull(self.pull, self.factory) - msg.store(self._rep_store) - # msg = message.Message.recv(self.collector_sock) - # # logger.info("received object") - # # Update all clients - # msg.store(self.store) - # msg.send(self.pub_sock) + datablock.store(self._rep_store) + + # Update all clients + datablock.push(self.publisher) def stop(self): - self.exit_event.set() + self._exit_event.set() self.snapshot.close() self.pull.close() diff --git a/test_replication.py b/test_replication.py index 306fe22..144a99f 100644 --- a/test_replication.py +++ b/test_replication.py @@ -30,7 +30,7 @@ class RepSampleData(ReplicatedDatablock): class TestDataReplication(unittest.TestCase): - def test_setup_data_factory(self): + def test_data_factory(self): factory = ReplicatedDataFactory() factory.register_type(SampleData, RepSampleData) data_sample = SampleData() @@ -38,23 +38,41 @@ class TestDataReplication(unittest.TestCase): self.assertEqual(isinstance(rep_sample,RepSampleData), True) - def test_replicate_client_data(self): + def test_basic_client_start(self): factory = ReplicatedDataFactory() factory.register_type(SampleData, RepSampleData) - server_api = Server(factory=factory) - server_api.serve() - client_api = Client(factory=factory) - client_api.connect() + server = Server(factory=factory) + server.serve() - data_sample = SampleData() - data_sample_key = client_api.register(data_sample) + client = Client(factory=factory) + client.connect() - #Waiting for server to receive the datas - time.sleep(.1) + time.sleep(1) - #Check if if receive them - self.assertNotEqual(server_api._rep_store[data_sample_key],None) + self.assertEqual(client.state(), 2) + + # def test_register_client_data(self): + # # Setup data factory + # factory = ReplicatedDataFactory() + # factory.register_type(SampleData, RepSampleData) + + # server = Server(factory=factory) + # server.serve() + + # client = Client(factory=factory) + # client.connect() + + # client2 = Client(factory=factory) + # client2.connect() + + # data_sample_key = client.register(SampleData()) + + # #Waiting for server to receive the datas + # time.sleep(1) + + # #Check if the server receive them + # self.assertNotEqual(client2._rep_store[data_sample_key],None)