feat: apply logic

feat: state draft
This commit is contained in:
Swann Martinez
2019-07-18 16:38:13 +02:00
parent aa73064eed
commit 51093b1307
4 changed files with 208 additions and 102 deletions

View File

@ -52,7 +52,6 @@ def zpipe(ctx):
class Client(object):
ctx = None
pipe = None
net_agent = None
@ -205,7 +204,7 @@ class Client(object):
# SAVING FUNCTIONS
def dump(self, filepath):
with open('dump.json',"w") as fp:
with open('dump.json', "w") as fp:
for key, value in self.store.items():
line = json.dumps(value.body)
fp.write(line)

View File

@ -8,41 +8,54 @@ except:
from libs import umsgpack
import zmq
import pickle
from enum import Enum
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
class RepState(Enum):
ADDED = 0
COMMITED = 1
STAGED = 2
class ReplicatedDataFactory(object):
"""
Manage the data types implamentations
Manage the data types implementations
"""
def __init__(self):
self.supported_types = []
def register_type(self,dtype, implementation):
# Default registered types
self.register_type(str,RepCommand)
def register_type(self, dtype, implementation):
"""
Register a new replicated datatype implementation
"""
self.supported_types.append((dtype, implementation))
def match_type_by_instance(self,data):
def match_type_by_instance(self, data):
"""
Find corresponding type to the given datablock
"""
for stypes, implementation in self.supported_types:
for stypes, implementation in self.supported_types:
if isinstance(data, stypes):
return implementation
print("type not supported for replication")
raise NotImplementedError
def match_type_by_name(self,type_name):
for stypes, implementation in self.supported_types:
if type_name == stypes.__name__:
def match_type_by_name(self, type_name):
for stypes, implementation in self.supported_types:
if type_name == implementation.__name__:
return implementation
return None
print("type not supported for replication")
raise NotImplementedError
def construct_from_dcc(self,data):
def construct_from_dcc(self, data):
implementation = self.match_type_by_instance(data)
return implementation
@ -52,24 +65,26 @@ class ReplicatedDataFactory(object):
"""
return self.match_type_by_name(type_name)
class ReplicatedDatablock(object):
"""
Datablock used for replication
"""
uuid = None # key (string)
pointer = None # dcc data reference
data = None # data blob (json)
str_type = None # data type name (for deserialization)
deps = None # dependencies references
owner = None
uuid = None # uuid used as key (string)
pointer = None # dcc data ref (DCC type)
buffer = None # data blob (json)
str_type = None # data type name (string)
deps = [None] # dependencies array (string)
owner = None # Data owner (string)
state = None # Data state (RepState)
def __init__(self, owner=None, data=None, uuid=None):
def __init__(self, owner=None, data=None, uuid=None, buffer=None):
self.uuid = uuid if uuid else str(uuid4())
assert(owner)
self.owner = owner
self.pointer = data
self.str_type = type(data).__name__
self.buffer = buffer if buffer else None
self.str_type = type(self).__name__
def push(self, socket):
"""
@ -83,8 +98,8 @@ class ReplicatedDatablock(object):
key = self.uuid.encode()
type = self.str_type.encode()
socket.send_multipart([key,owner,type,data])
socket.send_multipart([key, owner, type, data])
@classmethod
def pull(cls, socket, factory):
"""
@ -92,59 +107,86 @@ class ReplicatedDatablock(object):
- read data from the socket
- reconstruct an instance
"""
uuid, owner,str_type, data = socket.recv_multipart(zmq.NOBLOCK)
uuid, owner, str_type, data = socket.recv_multipart(zmq.NOBLOCK)
str_type = str_type.decode()
owner=owner.decode()
uuid=uuid.decode()
owner = owner.decode()
uuid = uuid.decode()
data = self.deserialize(data)
instance = factory.construct_from_net(str_type)(owner=owner, uuid=uuid)
instance = factory.construct_from_net(str_type)(owner=owner, uuid=uuid, buffer=data)
# instance.data = instance.deserialize(data)
return instance
def store(self, dict, persistent=False):
def store(self, dict, persistent=False):
"""
I want to store my replicated data. Persistent means into the disk
If uuid is none we delete the key from the volume
"""
if self.uuid is not None:
if self.data == 'None':
logger.info("erasing key {}".format(self.uuid))
if self.buffer == 'None':
logger.debug("erasing key {}".format(self.uuid))
del dict[self.uuid]
else:
dict[self.uuid] = self
return self.uuid
def deserialize(self,data):
def deserialize(self, data):
"""
I want to apply changes into the DCC
MUST RETURN AN OBJECT INSTANCE
BUFFER -> JSON
"""
raise NotImplementedError
def serialize(self,data):
def serialize(self, data):
"""
I want to load data from DCC
DCC -> JSON
MUST RETURN A BYTE ARRAY
"""
raise NotImplementedError
def apply(self,data,target):
"""
JSON -> DCC
"""
raise NotImplementedError
def resolve(self):
"""
I want to resolve my orphan data to an existing one
= Assing my pointer
"""
raise NotImplementedError
def dump(self):
return self.deserialize(self.buffer)
class RepCommand(ReplicatedDatablock):
def serialize(self,data):
return pickle.dumps(data)
def deserialize(self,data):
return pickle.load(data)
def apply(self,data,target):
target = data
# class RepObject(ReplicatedDatablock):
# def deserialize(self):
# try:
# if self.pointer is None:
# pointer = None
# # Object specific constructor...
# if self.data["data"] in bpy.data.meshes.keys():
# pointer = bpy.data.meshes[self.data["data"]]
@ -173,11 +215,9 @@ class ReplicatedDatablock(object):
# self.pointer.hide_select = False
# else:
# self.pointer.hide_select = True
# except Exception as e:
# logger.error("Object {} loading error: {} ".format(self.data["name"], e))
# def deserialize(self):
# self.data = dump_datablock(self.pointer, 1)

View File

@ -2,10 +2,10 @@ import threading
import logging
import zmq
import time
from replication import ReplicatedDatablock
from replication import ReplicatedDatablock, RepCommand
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
STATE_INITIAL = 0
STATE_SYNCING = 1
@ -14,16 +14,22 @@ STATE_ACTIVE = 2
class Client(object):
def __init__(self,factory=None, config=None):
self._rep_store = {}
self._net = ClientNetService(self._rep_store)
assert(factory)
self._rep_store = {}
self._net_client = ClientNetService(
store_reference=self._rep_store,
factory=factory)
self._factory = factory
def connect(self):
self._net.start()
self._net_client.start()
def disconnect(self):
self._net.stop()
self._net_client.stop()
def state(self):
return self._net_client.state
def register(self, object):
"""
@ -34,12 +40,13 @@ class Client(object):
new_item = self._factory.construct_from_dcc(object)(owner="client", data=object)
if new_item:
log.info("Registering {} on {}".format(object,new_item.uuid))
logger.info("Registering {} on {}".format(object,new_item.uuid))
new_item.store(self._rep_store)
log.info("Pushing changes...")
new_item.push(self._net.publish)
logger.info("Pushing changes...")
new_item.push(self._net_client.publish)
return new_item.uuid
else:
raise TypeError("Type not supported")
@ -49,21 +56,26 @@ class Client(object):
def unregister(self,object):
pass
class ClientNetService(threading.Thread):
def __init__(self,store_reference=None):
def __init__(self,store_reference=None, factory=None):
# Threading
threading.Thread.__init__(self)
self.name = "ClientNetLink"
self.daemon = True
self.exit_event = threading.Event()
self._exit_event = threading.Event()
self._factory = factory
self._store_reference = store_reference
assert(self._factory)
# Networking
self.context = zmq.Context.instance()
self.snapshot = self.context.socket(zmq.DEALER)
self.snapshot.setsockopt(zmq.IDENTITY, b'SERVER')
self.snapshot.connect("tcp://127.0.0.1:5560")
self.subscriber = self.context.socket(zmq.SUB)
@ -78,25 +90,55 @@ class ClientNetService(threading.Thread):
def run(self):
log.info("Client is listening")
logger.info("Client is online")
poller = zmq.Poller()
poller.register(self.snapshot, zmq.POLLIN)
poller.register(self.subscriber, zmq.POLLIN)
poller.register(self.publish, zmq.POLLOUT)
self.state = 1
while not self._exit_event.is_set():
"""NET OUT
Given the net state we do something:
SYNCING : Ask for snapshots
ACTIVE : Do nothing
"""
if self.state == STATE_INITIAL:
self.snapshot.send(b"SNAPSHOT_REQUEST")
self.state = STATE_SYNCING
while not self.exit_event.is_set():
items = dict(poller.poll(10))
"""NET IN
Given the net state we do something:
SYNCING : Ask for snapshots
ACTIVE : Do nothing
"""
items = dict(poller.poll(1))
if self.snapshot in items:
if self.state == STATE_SYNCING:
datablock = ReplicatedDatablock.pull(self.snapshot, self._factory)
if isinstance(datablock, RepCommand):
# We receive updates from the server !
if self.subscriber in items:
if self.state == STATE_ACTIVE:
logger.debug("Receiving changes from server")
datablock = ReplicatedDatablock.pull(self.subscriber, self._factory)
datablock.store(self._store_reference)
if not items:
log.error("No request ")
logger.error("No request ")
time.sleep(1)
def setup(self,id="Client"):
pass
def stop(self):
self.exit_event.set()
self._exit_event.set()
self.snapshot.close()
self.subscriber.close()
@ -110,7 +152,6 @@ class Server():
def __init__(self,config=None, factory=None):
self._rep_store = {}
self._net = ServerNetService(store_reference=self._rep_store, factory=factory)
# self.serve()
def serve(self):
self._net.start()
@ -128,7 +169,9 @@ class ServerNetService(threading.Thread):
threading.Thread.__init__(self)
self.name = "ServerNetLink"
self.daemon = True
self.exit_event = threading.Event()
self._exit_event = threading.Event()
# Networking
self._rep_store = store_reference
self.context = zmq.Context.instance()
@ -160,33 +203,41 @@ class ServerNetService(threading.Thread):
self.pull.bind("tcp://*:5562")
except zmq.error.ZMQError:
log.error("Address already in use, change net config")
logger.error("Address already in use, change net config")
def run(self):
log.info("Server is listening")
logger.debug("Server is online")
poller = zmq.Poller()
poller.register(self.snapshot, zmq.POLLIN)
poller.register(self.pull, zmq.POLLIN)
self.state = STATE_ACTIVE
while not self.exit_event.is_set():
while not self._exit_event.is_set():
# Non blocking poller
socks = dict(poller.poll(1000))
socks = dict(poller.poll())
# Snapshot system for late join (Server - Client)
# if self.snapshot in socks:
# msg = self.snapshot.recv_multipart(zmq.DONTWAIT)
if self.snapshot in socks:
msg = self.snapshot.recv_multipart(zmq.DONTWAIT)
# identity = msg[0]
# request = msg[1]
identity = msg[0]
request = msg[1]
# if request == b"SNAPSHOT_REQUEST":
# pass
# else:
# logger.info("Bad snapshot request")
# break
if request == b"SNAPSHOT_REQUEST":
pass
else:
logger.info("Bad snapshot request")
break
for key, item in self._rep_store:
self.snapshot.send(identity, zmq.SNDMORE)
item.push(self.snapshot)
self.snapshot.send(identity, zmq.SNDMORE)
RepCommand(owner='server',data='SNAPSHOT_END').push(self.snapshot)
# ordered_props = [(SUPPORTED_TYPES.index(k.split('/')[0]),k,v) for k, v in self.property_map.items()]
# ordered_props.sort(key=itemgetter(0))
@ -204,19 +255,17 @@ class ServerNetService(threading.Thread):
# Regular update routing (Clients / Client)
if self.pull in socks:
log.info("Receiving changes from client")
msg = ReplicatedDatablock.pull(self.pull, self.factory)
logger.debug("Receiving changes from client")
datablock = ReplicatedDatablock.pull(self.pull, self.factory)
msg.store(self._rep_store)
# msg = message.Message.recv(self.collector_sock)
# # logger.info("received object")
# # Update all clients
# msg.store(self.store)
# msg.send(self.pub_sock)
datablock.store(self._rep_store)
# Update all clients
datablock.push(self.publisher)
def stop(self):
self.exit_event.set()
self._exit_event.set()
self.snapshot.close()
self.pull.close()

View File

@ -30,7 +30,7 @@ class RepSampleData(ReplicatedDatablock):
class TestDataReplication(unittest.TestCase):
def test_setup_data_factory(self):
def test_data_factory(self):
factory = ReplicatedDataFactory()
factory.register_type(SampleData, RepSampleData)
data_sample = SampleData()
@ -38,23 +38,41 @@ class TestDataReplication(unittest.TestCase):
self.assertEqual(isinstance(rep_sample,RepSampleData), True)
def test_replicate_client_data(self):
def test_basic_client_start(self):
factory = ReplicatedDataFactory()
factory.register_type(SampleData, RepSampleData)
server_api = Server(factory=factory)
server_api.serve()
client_api = Client(factory=factory)
client_api.connect()
server = Server(factory=factory)
server.serve()
data_sample = SampleData()
data_sample_key = client_api.register(data_sample)
client = Client(factory=factory)
client.connect()
#Waiting for server to receive the datas
time.sleep(.1)
time.sleep(1)
#Check if if receive them
self.assertNotEqual(server_api._rep_store[data_sample_key],None)
self.assertEqual(client.state(), 2)
# def test_register_client_data(self):
# # Setup data factory
# factory = ReplicatedDataFactory()
# factory.register_type(SampleData, RepSampleData)
# server = Server(factory=factory)
# server.serve()
# client = Client(factory=factory)
# client.connect()
# client2 = Client(factory=factory)
# client2.connect()
# data_sample_key = client.register(SampleData())
# #Waiting for server to receive the datas
# time.sleep(1)
# #Check if the server receive them
# self.assertNotEqual(client2._rep_store[data_sample_key],None)