feat: serialization progress on data transition
This commit is contained in:
@ -24,7 +24,10 @@ class ReplicatedDataFactory(object):
|
|||||||
"""
|
"""
|
||||||
self.supported_types.append((dtype, implementation))
|
self.supported_types.append((dtype, implementation))
|
||||||
|
|
||||||
def match_type(self,data):
|
def match_type_by_instance(self,data):
|
||||||
|
"""
|
||||||
|
Find corresponding type to the given datablock
|
||||||
|
"""
|
||||||
for stypes, implementation in self.supported_types:
|
for stypes, implementation in self.supported_types:
|
||||||
if isinstance(data, stypes):
|
if isinstance(data, stypes):
|
||||||
return implementation
|
return implementation
|
||||||
@ -32,10 +35,16 @@ class ReplicatedDataFactory(object):
|
|||||||
print("type not supported for replication")
|
print("type not supported for replication")
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def construct(self,data):
|
def construct_from_dcc(self,data):
|
||||||
implementation = self.match_type(data)
|
implementation = self.match_type_by_instance(data)
|
||||||
return implementation
|
return implementation
|
||||||
|
|
||||||
|
def construct_from_net(self, type_name):
|
||||||
|
"""
|
||||||
|
Reconstruct a new replicated value from serialized data
|
||||||
|
"""
|
||||||
|
return self.match_type_by_name(data)
|
||||||
|
|
||||||
class ReplicatedDatablock(object):
|
class ReplicatedDatablock(object):
|
||||||
"""
|
"""
|
||||||
Datablock used for replication
|
Datablock used for replication
|
||||||
@ -43,13 +52,19 @@ class ReplicatedDatablock(object):
|
|||||||
uuid = None # key (string)
|
uuid = None # key (string)
|
||||||
pointer = None # dcc data reference
|
pointer = None # dcc data reference
|
||||||
data = None # data blob (json)
|
data = None # data blob (json)
|
||||||
|
str_type = None # data type name (for deserialization)
|
||||||
deps = None # dependencies references
|
deps = None # dependencies references
|
||||||
|
owner = None
|
||||||
|
|
||||||
def __init__(self, owner=None, data=None):
|
def __init__(self, owner=None, data=None, uuid=None):
|
||||||
self.uuid = str(uuid4())
|
self.uuid = uuid if uuid else str(uuid4())
|
||||||
assert(owner)
|
assert(owner)
|
||||||
|
self.owner = owner
|
||||||
self.pointer = data
|
self.pointer = data
|
||||||
|
|
||||||
|
if data:
|
||||||
|
self.str_type = self.data.__class__.__name__
|
||||||
|
|
||||||
|
|
||||||
def push(self, socket):
|
def push(self, socket):
|
||||||
"""
|
"""
|
||||||
@ -59,19 +74,25 @@ class ReplicatedDatablock(object):
|
|||||||
"""
|
"""
|
||||||
data = self.serialize(self.pointer)
|
data = self.serialize(self.pointer)
|
||||||
assert(isinstance(data, bytes))
|
assert(isinstance(data, bytes))
|
||||||
|
owner = self.owner.encode()
|
||||||
key = self.uuid.encode()
|
key = self.uuid.encode()
|
||||||
|
|
||||||
socket.send_multipart([key,data])
|
socket.send_multipart([key,owner,data])
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def pull(cls, socket):
|
def pull(cls, socket, factory):
|
||||||
"""
|
"""
|
||||||
Here we reeceive data from the wire:
|
Here we reeceive data from the wire:
|
||||||
- read data from the socket
|
- read data from the socket
|
||||||
- reconstruct an instance
|
- reconstruct an instance
|
||||||
"""
|
"""
|
||||||
uuid, data = socket.recv_multipart(zmq.NOBLOCK)
|
uuid, owner, data = socket.recv_multipart(zmq.NOBLOCK)
|
||||||
|
|
||||||
|
instance = factory.construct_from_net(data)(owner=owner.decode(), uuid=uuid.decode())
|
||||||
|
|
||||||
|
instance.data = instance.deserialize(data)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
|
||||||
def store(self, dict, persistent=False):
|
def store(self, dict, persistent=False):
|
||||||
"""
|
"""
|
||||||
|
@ -25,18 +25,23 @@ class Client(object):
|
|||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
self._net.stop()
|
self._net.stop()
|
||||||
|
|
||||||
def add(self, object):
|
def register(self, object):
|
||||||
"""
|
"""
|
||||||
Register a new item for replication
|
Register a new item for replication
|
||||||
"""
|
"""
|
||||||
assert(object)
|
assert(object)
|
||||||
|
|
||||||
new_item = self._factory.construct(object)(owner="client")
|
new_item = self._factory.construct_from_dcc(object)(owner="client")
|
||||||
new_item.store(self._rep_store)
|
|
||||||
log.info("Registering {} on {}".format(object,new_item.uuid))
|
|
||||||
|
|
||||||
|
if new_item:
|
||||||
|
log.info("Registering {} on {}".format(object,new_item.uuid))
|
||||||
|
new_item.store(self._rep_store)
|
||||||
|
|
||||||
|
log.info("Pushing changes...")
|
||||||
new_item.push(self._net.publish)
|
new_item.push(self._net.publish)
|
||||||
return new_item.uuid
|
return new_item.uuid
|
||||||
|
else:
|
||||||
|
raise TypeError("Type not supported")
|
||||||
|
|
||||||
def pull(self,object=None):
|
def pull(self,object=None):
|
||||||
pass
|
pass
|
||||||
@ -118,7 +123,7 @@ class Server():
|
|||||||
|
|
||||||
|
|
||||||
class ServerNetService(threading.Thread):
|
class ServerNetService(threading.Thread):
|
||||||
def __init__(self,store_reference=None):
|
def __init__(self,store_reference=None, factory=None):
|
||||||
# Threading
|
# Threading
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.name = "ServerNetLink"
|
self.name = "ServerNetLink"
|
||||||
@ -131,6 +136,7 @@ class ServerNetService(threading.Thread):
|
|||||||
self.publisher = None
|
self.publisher = None
|
||||||
self.pull = None
|
self.pull = None
|
||||||
self.state = 0
|
self.state = 0
|
||||||
|
self.factory = factory
|
||||||
|
|
||||||
self.bind_ports()
|
self.bind_ports()
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ class TestDataReplication(unittest.TestCase):
|
|||||||
factory = ReplicatedDataFactory()
|
factory = ReplicatedDataFactory()
|
||||||
factory.register_type(SampleData, RepSampleData)
|
factory.register_type(SampleData, RepSampleData)
|
||||||
data_sample = SampleData()
|
data_sample = SampleData()
|
||||||
rep_sample = factory.construct(data_sample)(owner="toto")
|
rep_sample = factory.construct_from_dcc(data_sample)(owner="toto")
|
||||||
|
|
||||||
self.assertEqual(isinstance(rep_sample,RepSampleData), True)
|
self.assertEqual(isinstance(rep_sample,RepSampleData), True)
|
||||||
|
|
||||||
@ -78,7 +78,7 @@ class TestDataReplication(unittest.TestCase):
|
|||||||
client_api.connect()
|
client_api.connect()
|
||||||
|
|
||||||
data_sample = SampleData()
|
data_sample = SampleData()
|
||||||
data_sample_key = client_api.add(data_sample)
|
data_sample_key = client_api.register(data_sample)
|
||||||
|
|
||||||
|
|
||||||
self.assertNotEqual(client_api._rep_store[data_sample_key],None)
|
self.assertNotEqual(client_api._rep_store[data_sample_key],None)
|
||||||
|
Reference in New Issue
Block a user