refactor: moved replication code to another repo
This commit is contained in:
@ -1,27 +0,0 @@
|
|||||||
# This file is a template, and might need editing before it works on your project.
|
|
||||||
# Official language image. Look for the different tagged releases at:
|
|
||||||
# https://hub.docker.com/r/library/python/tags/
|
|
||||||
image: python:latest
|
|
||||||
|
|
||||||
# Change pip's cache directory to be inside the project directory since we can
|
|
||||||
# only cache local items.
|
|
||||||
variables:
|
|
||||||
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
|
|
||||||
|
|
||||||
# Pip's cache doesn't store the python packages
|
|
||||||
# https://pip.pypa.io/en/stable/reference/pip_install/#caching
|
|
||||||
#
|
|
||||||
# If you want to also cache the installed packages, you have to install
|
|
||||||
# them in a virtualenv and cache it as well.
|
|
||||||
cache:
|
|
||||||
paths:
|
|
||||||
- .cache/pip
|
|
||||||
|
|
||||||
|
|
||||||
before_script:
|
|
||||||
- python -V # Print out python version for debugging
|
|
||||||
- pip install zmq umsgpack
|
|
||||||
|
|
||||||
test:
|
|
||||||
script:
|
|
||||||
- python -m unittest discover
|
|
208
replication.py
208
replication.py
@ -1,208 +0,0 @@
|
|||||||
import json
|
|
||||||
import logging
|
|
||||||
import pickle
|
|
||||||
from enum import Enum
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
import zmq
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class RepState(Enum):
|
|
||||||
ADDED = 0
|
|
||||||
COMMITED = 1
|
|
||||||
STAGED = 2
|
|
||||||
|
|
||||||
|
|
||||||
class ReplicatedDataFactory(object):
|
|
||||||
"""
|
|
||||||
Manage the data types implementations.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.supported_types = []
|
|
||||||
|
|
||||||
# Default registered types
|
|
||||||
self.register_type(str, RepCommand)
|
|
||||||
self.register_type(RepDeleteCommand, RepDeleteCommand)
|
|
||||||
|
|
||||||
def register_type(self, dtype, implementation):
|
|
||||||
"""
|
|
||||||
Register a new replicated datatype implementation
|
|
||||||
"""
|
|
||||||
self.supported_types.append((dtype, implementation))
|
|
||||||
|
|
||||||
def match_type_by_instance(self, data):
|
|
||||||
"""
|
|
||||||
Find corresponding type to the given datablock
|
|
||||||
"""
|
|
||||||
for stypes, implementation in self.supported_types:
|
|
||||||
if isinstance(data, stypes):
|
|
||||||
return implementation
|
|
||||||
|
|
||||||
print("type not supported for replication")
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def match_type_by_name(self, type_name):
|
|
||||||
for stypes, implementation in self.supported_types:
|
|
||||||
if type_name == implementation.__name__:
|
|
||||||
return implementation
|
|
||||||
print("type not supported for replication")
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def construct_from_dcc(self, data):
|
|
||||||
implementation = self.match_type_by_instance(data)
|
|
||||||
return implementation
|
|
||||||
|
|
||||||
def construct_from_net(self, type_name):
|
|
||||||
"""
|
|
||||||
Reconstruct a new replicated value from serialized data
|
|
||||||
"""
|
|
||||||
return self.match_type_by_name(type_name)
|
|
||||||
|
|
||||||
|
|
||||||
class ReplicatedDatablock(object):
|
|
||||||
"""
|
|
||||||
Datablock definition that handle object replication logic.
|
|
||||||
PUSH: send the object over the wire
|
|
||||||
STORE: register the object on the given replication graph
|
|
||||||
LOAD: apply loaded changes by reference on the local copy
|
|
||||||
DUMP: get local changes
|
|
||||||
|
|
||||||
"""
|
|
||||||
uuid = None # uuid used as key (string)
|
|
||||||
pointer = None # dcc data ref (DCC type)
|
|
||||||
buffer = None # raw data (json)
|
|
||||||
str_type = None # data type name (string)
|
|
||||||
deps = [None] # dependencies array (string)
|
|
||||||
owner = None # Data owner (string)
|
|
||||||
state = None # Data state (RepState)
|
|
||||||
|
|
||||||
def __init__(self, owner=None, pointer=None, uuid=None, buffer=None):
|
|
||||||
self.uuid = uuid if uuid else str(uuid4())
|
|
||||||
assert(owner)
|
|
||||||
self.owner = owner
|
|
||||||
|
|
||||||
if pointer:
|
|
||||||
self.pointer = pointer
|
|
||||||
self.buffer = self.dump()
|
|
||||||
elif buffer:
|
|
||||||
self.buffer = buffer
|
|
||||||
|
|
||||||
self.str_type = type(self).__name__
|
|
||||||
|
|
||||||
def push(self, socket):
|
|
||||||
"""
|
|
||||||
Here send data over the wire:
|
|
||||||
- serialize the data
|
|
||||||
- send them as a multipart frame thought the given socket
|
|
||||||
"""
|
|
||||||
assert(self.buffer)
|
|
||||||
|
|
||||||
data = self.serialize(self.buffer)
|
|
||||||
assert(isinstance(data, bytes))
|
|
||||||
owner = self.owner.encode()
|
|
||||||
key = self.uuid.encode()
|
|
||||||
type = self.str_type.encode()
|
|
||||||
|
|
||||||
socket.send_multipart([key, owner, type, data])
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def pull(cls, socket, factory):
|
|
||||||
"""
|
|
||||||
Here we reeceive data from the wire:
|
|
||||||
- read data from the socket
|
|
||||||
- reconstruct an instance
|
|
||||||
"""
|
|
||||||
uuid, owner, str_type, data = socket.recv_multipart(zmq.NOBLOCK)
|
|
||||||
|
|
||||||
str_type = str_type.decode()
|
|
||||||
owner = owner.decode()
|
|
||||||
uuid = uuid.decode()
|
|
||||||
|
|
||||||
instance = factory.construct_from_net(str_type)(owner=owner, uuid=uuid)
|
|
||||||
instance.buffer = instance.deserialize(data)
|
|
||||||
|
|
||||||
return instance
|
|
||||||
|
|
||||||
def store(self, dict, persistent=False):
|
|
||||||
"""
|
|
||||||
I want to store my replicated data. Persistent means into the disk
|
|
||||||
If uuid is none we delete the key from the volume
|
|
||||||
"""
|
|
||||||
if self.uuid is not None:
|
|
||||||
if self.buffer == 'None':
|
|
||||||
logger.debug("erasing key {}".format(self.uuid))
|
|
||||||
del dict[self.uuid]
|
|
||||||
else:
|
|
||||||
dict[self.uuid] = self
|
|
||||||
|
|
||||||
return self.uuid
|
|
||||||
|
|
||||||
def deserialize(self, data):
|
|
||||||
"""
|
|
||||||
BUFFER -> JSON
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def serialize(self, data):
|
|
||||||
"""
|
|
||||||
JSON -> BUFFER
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def dump(self):
|
|
||||||
"""
|
|
||||||
DCC -> JSON
|
|
||||||
"""
|
|
||||||
assert(self.pointer)
|
|
||||||
|
|
||||||
return json.dumps(self.pointer)
|
|
||||||
|
|
||||||
def load(self, target=None):
|
|
||||||
"""
|
|
||||||
JSON -> DCC
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def resolve(self):
|
|
||||||
"""
|
|
||||||
I want to resolve my orphan data to an existing one
|
|
||||||
= Assing my pointer
|
|
||||||
|
|
||||||
"""
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "{uuid} - owner: {owner} - type: {type}".format(
|
|
||||||
uuid=self.uuid,
|
|
||||||
owner=self.owner,
|
|
||||||
type=self.str_type
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class RepCommand(ReplicatedDatablock):
|
|
||||||
def serialize(self, data):
|
|
||||||
return pickle.dumps(data)
|
|
||||||
|
|
||||||
def deserialize(self, data):
|
|
||||||
return pickle.loads(data)
|
|
||||||
|
|
||||||
def load(self, target):
|
|
||||||
target = self.pointer
|
|
||||||
|
|
||||||
|
|
||||||
class RepDeleteCommand(ReplicatedDatablock):
|
|
||||||
def serialize(self, data):
|
|
||||||
return pickle.dumps(data)
|
|
||||||
|
|
||||||
def deserialize(self, data):
|
|
||||||
return pickle.loads(data)
|
|
||||||
|
|
||||||
def store(self, rep_store):
|
|
||||||
assert(self.buffer)
|
|
||||||
|
|
||||||
if rep_store and self.buffer in rep_store.keys():
|
|
||||||
del rep_store[self.buffer]
|
|
@ -1,341 +0,0 @@
|
|||||||
import logging
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
|
|
||||||
import zmq
|
|
||||||
|
|
||||||
from replication import RepCommand, RepDeleteCommand, ReplicatedDatablock
|
|
||||||
from replication_graph import ReplicationGraph
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
STATE_INITIAL = 0
|
|
||||||
STATE_SYNCING = 1
|
|
||||||
STATE_ACTIVE = 2
|
|
||||||
|
|
||||||
|
|
||||||
class Client(object):
|
|
||||||
def __init__(self, factory=None, supervisor=False):
|
|
||||||
assert(factory)
|
|
||||||
|
|
||||||
self._rep_store = ReplicationGraph()
|
|
||||||
self._net_client = ClientNetService(
|
|
||||||
store_reference=self._rep_store,
|
|
||||||
factory=factory)
|
|
||||||
self._factory = factory
|
|
||||||
self._is_supervisor = supervisor
|
|
||||||
self._id = None
|
|
||||||
|
|
||||||
def connect(self, id="Default", address="127.0.0.1", port=5560):
|
|
||||||
"""
|
|
||||||
Connect to the server
|
|
||||||
"""
|
|
||||||
self._id = id
|
|
||||||
self._net_client.connect(id=id, address=address, port=port)
|
|
||||||
|
|
||||||
def disconnect(self):
|
|
||||||
"""
|
|
||||||
Disconnect from server, reset the client
|
|
||||||
"""
|
|
||||||
self._net_client.stop()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def state(self):
|
|
||||||
"""
|
|
||||||
Return the client state
|
|
||||||
0: STATE_INITIAL
|
|
||||||
1: STATE_SYNCING
|
|
||||||
2: STATE_ACTIVE
|
|
||||||
"""
|
|
||||||
return self._net_client.state
|
|
||||||
|
|
||||||
def register(self, object):
|
|
||||||
"""
|
|
||||||
Register a new item for replication
|
|
||||||
TODO: Dig in the replication comportement,
|
|
||||||
find a better way to handle replication behavior
|
|
||||||
"""
|
|
||||||
assert(object)
|
|
||||||
|
|
||||||
# Construct the coresponding replication type
|
|
||||||
new_item = self._factory.construct_from_dcc(
|
|
||||||
object)(owner=self._id, pointer=object)
|
|
||||||
|
|
||||||
if new_item:
|
|
||||||
logger.debug("Registering {} on {}".format(object, new_item.uuid))
|
|
||||||
new_item.store(self._rep_store)
|
|
||||||
|
|
||||||
logger.debug("Pushing new registered value")
|
|
||||||
new_item.push(self._net_client.publish)
|
|
||||||
return new_item.uuid
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise TypeError("Type not supported")
|
|
||||||
|
|
||||||
def unregister(self, object_uuid, clean=False):
|
|
||||||
"""
|
|
||||||
Unregister for replication the given
|
|
||||||
object.
|
|
||||||
The clean option purpose is to remove
|
|
||||||
the pointer data's
|
|
||||||
"""
|
|
||||||
|
|
||||||
if object_uuid in self._rep_store.keys():
|
|
||||||
delete_command = RepDeleteCommand(
|
|
||||||
owner='client', buffer=object_uuid)
|
|
||||||
# remove the key from our store
|
|
||||||
delete_command.store(self._rep_store)
|
|
||||||
delete_command.push(self._net_client.publish)
|
|
||||||
else:
|
|
||||||
raise KeyError("Cannot unregister key")
|
|
||||||
|
|
||||||
def pull(self, object=None):
|
|
||||||
"""
|
|
||||||
Asynchonous pull
|
|
||||||
Here we want to pull all waiting changes and apply them
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get(self, object_uuid):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ClientNetService(threading.Thread):
|
|
||||||
def __init__(self, store_reference=None, factory=None):
|
|
||||||
|
|
||||||
# Threading
|
|
||||||
threading.Thread.__init__(self)
|
|
||||||
self.name = "ClientNetLink"
|
|
||||||
self.daemon = True
|
|
||||||
|
|
||||||
self._exit_event = threading.Event()
|
|
||||||
self._factory = factory
|
|
||||||
self._store_reference = store_reference
|
|
||||||
self._id = "None"
|
|
||||||
|
|
||||||
assert(self._factory)
|
|
||||||
|
|
||||||
# Networking
|
|
||||||
self.context = zmq.Context.instance()
|
|
||||||
self.state = STATE_INITIAL
|
|
||||||
|
|
||||||
def connect(self, id=None, address='127.0.0.1', port=5560):
|
|
||||||
"""
|
|
||||||
Network socket setup
|
|
||||||
"""
|
|
||||||
assert(id)
|
|
||||||
if self.state == STATE_INITIAL:
|
|
||||||
self._id = id
|
|
||||||
logger.debug("connecting on {}:{}".format(address, port))
|
|
||||||
self.command = self.context.socket(zmq.DEALER)
|
|
||||||
self.command.setsockopt(zmq.IDENTITY, self._id.encode())
|
|
||||||
self.command.connect("tcp://{}:{}".format(address, port))
|
|
||||||
|
|
||||||
self.subscriber = self.context.socket(zmq.DEALER)
|
|
||||||
self.subscriber.setsockopt(zmq.IDENTITY, self._id.encode())
|
|
||||||
|
|
||||||
# self.subscriber = self.context.socket(zmq.SUB)
|
|
||||||
# self.subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
|
|
||||||
|
|
||||||
self.subscriber.connect("tcp://{}:{}".format(address, port+1))
|
|
||||||
# self.subscriber.linger = 0
|
|
||||||
time.sleep(.5)
|
|
||||||
|
|
||||||
self.publish = self.context.socket(zmq.PUSH)
|
|
||||||
self.publish.connect("tcp://{}:{}".format(address, port+2))
|
|
||||||
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
logger.debug("{} online".format(self._id))
|
|
||||||
poller = zmq.Poller()
|
|
||||||
poller.register(self.command, zmq.POLLIN)
|
|
||||||
poller.register(self.subscriber, zmq.POLLIN)
|
|
||||||
poller.register(self.publish, zmq.POLLOUT)
|
|
||||||
|
|
||||||
while not self._exit_event.is_set():
|
|
||||||
"""NET OUT
|
|
||||||
Given the net state we do something:
|
|
||||||
INITIAL : Ask for snapshots
|
|
||||||
"""
|
|
||||||
if self.state == STATE_INITIAL:
|
|
||||||
logger.debug('{} : request snapshot'.format(self._id))
|
|
||||||
self.command.send(b"SNAPSHOT_REQUEST")
|
|
||||||
self.state = STATE_SYNCING
|
|
||||||
|
|
||||||
"""NET IN
|
|
||||||
Given the net state we do something:
|
|
||||||
SYNCING : load snapshots
|
|
||||||
ACTIVE : listen for updates
|
|
||||||
"""
|
|
||||||
items = dict(poller.poll(1))
|
|
||||||
|
|
||||||
# COMMANDS
|
|
||||||
if self.command in items:
|
|
||||||
datablock = ReplicatedDatablock.pull(
|
|
||||||
self.command, self._factory)
|
|
||||||
|
|
||||||
if self.state == STATE_SYNCING:
|
|
||||||
if 'SNAPSHOT_END' in datablock.buffer:
|
|
||||||
self.state = STATE_ACTIVE
|
|
||||||
logger.debug('{} : snapshot done'.format(self._id))
|
|
||||||
else:
|
|
||||||
datablock.store(self._store_reference)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# DATA
|
|
||||||
if self.subscriber in items:
|
|
||||||
if self.state == STATE_ACTIVE:
|
|
||||||
logger.debug(
|
|
||||||
"{} : Receiving changes from server".format(self._id))
|
|
||||||
datablock = ReplicatedDatablock.pull(
|
|
||||||
self.subscriber, self._factory)
|
|
||||||
datablock.store(self._store_reference)
|
|
||||||
|
|
||||||
if not items:
|
|
||||||
logger.error("No request ")
|
|
||||||
|
|
||||||
self.command.close()
|
|
||||||
self.subscriber.close()
|
|
||||||
self.publish.close()
|
|
||||||
|
|
||||||
self._exit_event.clear()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._exit_event.set()
|
|
||||||
|
|
||||||
# Wait the end of the run
|
|
||||||
while self._exit_event.is_set():
|
|
||||||
time.sleep(.1)
|
|
||||||
|
|
||||||
self.state = 0
|
|
||||||
|
|
||||||
|
|
||||||
class Server():
|
|
||||||
def __init__(self, config=None, factory=None):
|
|
||||||
self._rep_store = {}
|
|
||||||
self._net = ServerNetService(
|
|
||||||
store_reference=self._rep_store, factory=factory)
|
|
||||||
|
|
||||||
def serve(self, port=5560):
|
|
||||||
self._net.listen(port=port)
|
|
||||||
|
|
||||||
def state(self):
|
|
||||||
return self._net.state
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._net.stop()
|
|
||||||
|
|
||||||
|
|
||||||
class ServerNetService(threading.Thread):
|
|
||||||
def __init__(self, store_reference=None, factory=None):
|
|
||||||
# Threading
|
|
||||||
threading.Thread.__init__(self)
|
|
||||||
self.name = "ServerNetLink"
|
|
||||||
self.daemon = True
|
|
||||||
self._exit_event = threading.Event()
|
|
||||||
|
|
||||||
# Networking
|
|
||||||
self._rep_store = store_reference
|
|
||||||
|
|
||||||
self.context = zmq.Context.instance()
|
|
||||||
self.command = None
|
|
||||||
self.publisher = None
|
|
||||||
self.pull = None
|
|
||||||
self.state = 0
|
|
||||||
self.factory = factory
|
|
||||||
self.clients = {}
|
|
||||||
|
|
||||||
def listen(self, port=5560):
|
|
||||||
try:
|
|
||||||
# Update request
|
|
||||||
self.command = self.context.socket(zmq.ROUTER)
|
|
||||||
self.command.setsockopt(zmq.IDENTITY, b'SERVER')
|
|
||||||
self.command.setsockopt(zmq.RCVHWM, 60)
|
|
||||||
self.command.bind("tcp://*:{}".format(port))
|
|
||||||
|
|
||||||
# Update all clients
|
|
||||||
self.publisher = self.context.socket(zmq.ROUTER)
|
|
||||||
self.publisher.setsockopt(zmq.IDENTITY,b'SERVER_DATA')
|
|
||||||
self.publisher.bind("tcp://*:{}".format(port+1))
|
|
||||||
self.publisher.setsockopt(zmq.SNDHWM, 60)
|
|
||||||
self.publisher.linger = 0
|
|
||||||
|
|
||||||
# Update collector
|
|
||||||
self.pull = self.context.socket(zmq.PULL)
|
|
||||||
self.pull.setsockopt(zmq.RCVHWM, 60)
|
|
||||||
self.pull.bind("tcp://*:{}".format(port+2))
|
|
||||||
|
|
||||||
self.start()
|
|
||||||
except zmq.error.ZMQError:
|
|
||||||
logger.error("Address already in use, change net config")
|
|
||||||
|
|
||||||
def add_client(self, identity):
|
|
||||||
if identity in self.clients.keys():
|
|
||||||
logger.debug("client already added")
|
|
||||||
else:
|
|
||||||
self.clients[identity.decode()] = identity
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
logger.debug("Server is online")
|
|
||||||
poller = zmq.Poller()
|
|
||||||
poller.register(self.command, zmq.POLLIN)
|
|
||||||
poller.register(self.pull, zmq.POLLIN)
|
|
||||||
|
|
||||||
self.state = STATE_ACTIVE
|
|
||||||
|
|
||||||
while not self._exit_event.is_set():
|
|
||||||
# Non blocking poller
|
|
||||||
socks = dict(poller.poll(1))
|
|
||||||
|
|
||||||
# Snapshot system for late join (Server - Client)
|
|
||||||
if self.command in socks:
|
|
||||||
msg = self.command.recv_multipart(zmq.DONTWAIT)
|
|
||||||
|
|
||||||
identity = msg[0]
|
|
||||||
request = msg[1]
|
|
||||||
|
|
||||||
self.add_client(identity)
|
|
||||||
|
|
||||||
if request == b"SNAPSHOT_REQUEST":
|
|
||||||
# Sending snapshots
|
|
||||||
for key, item in self._rep_store.items():
|
|
||||||
self.command.send(identity, zmq.SNDMORE)
|
|
||||||
item.push(self.command)
|
|
||||||
|
|
||||||
# Snapshot end
|
|
||||||
self.command.send(identity, zmq.SNDMORE)
|
|
||||||
RepCommand(owner='server', pointer='SNAPSHOT_END').push(
|
|
||||||
self.command)
|
|
||||||
|
|
||||||
# Regular update routing (Clients / Server / Clients)
|
|
||||||
if self.pull in socks:
|
|
||||||
|
|
||||||
datablock = ReplicatedDatablock.pull(self.pull, self.factory)
|
|
||||||
logger.debug("SERVER: Receiving changes from {}".format(datablock.owner))
|
|
||||||
datablock.store(self._rep_store)
|
|
||||||
|
|
||||||
# Update all clients
|
|
||||||
for cli_name,cli_id in self.clients.items():
|
|
||||||
if cli_name != datablock.owner:
|
|
||||||
logger.debug("SERVER: Broadcast changes to {}".format(cli_name))
|
|
||||||
self.publisher.send(cli_id, zmq.SNDMORE)
|
|
||||||
datablock.push(self.publisher)
|
|
||||||
|
|
||||||
# datablock.push(self.publisher)
|
|
||||||
|
|
||||||
self.command.close()
|
|
||||||
self.pull.close()
|
|
||||||
self.publisher.close()
|
|
||||||
|
|
||||||
self._exit_event.clear()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._exit_event.set()
|
|
||||||
|
|
||||||
# Wait the end of the run
|
|
||||||
while self._exit_event.is_set():
|
|
||||||
time.sleep(.1)
|
|
||||||
|
|
||||||
self.state = 0
|
|
@ -1,36 +0,0 @@
|
|||||||
import collections
|
|
||||||
from replication import ReplicatedDatablock
|
|
||||||
|
|
||||||
class ReplicationGraph(collections.MutableMapping):
|
|
||||||
"""
|
|
||||||
Structure to hold replicated data relation graph
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.store = dict()
|
|
||||||
self.update(dict(*args, **kwargs)) # use the free update to set keys
|
|
||||||
|
|
||||||
def __getitem__(self, key):
|
|
||||||
return self.store[key]
|
|
||||||
|
|
||||||
def __setitem__(self, key, value):
|
|
||||||
self.store[key] = value
|
|
||||||
|
|
||||||
def __delitem__(self, key):
|
|
||||||
del self.store[key]
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return iter(self.store)
|
|
||||||
|
|
||||||
def __len__(self):
|
|
||||||
return len(self.store)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
str = "\n"
|
|
||||||
for key,item in self.store.items():
|
|
||||||
str+=repr(item)
|
|
||||||
return str
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1,263 +0,0 @@
|
|||||||
import cProfile
|
|
||||||
import logging
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
import unittest
|
|
||||||
import umsgpack
|
|
||||||
|
|
||||||
logging.basicConfig()
|
|
||||||
logging.getLogger().setLevel(logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
from replication import ReplicatedDatablock, ReplicatedDataFactory
|
|
||||||
from replication_client import Client, Server
|
|
||||||
|
|
||||||
|
|
||||||
class SampleData():
|
|
||||||
def __init__(self, map={"sample": bytearray(50000)}):
|
|
||||||
self.map = map
|
|
||||||
|
|
||||||
|
|
||||||
class RepSampleData(ReplicatedDatablock):
|
|
||||||
def serialize(self, data):
|
|
||||||
import pickle
|
|
||||||
|
|
||||||
return pickle.dumps(data)
|
|
||||||
|
|
||||||
def deserialize(self, data):
|
|
||||||
import pickle
|
|
||||||
|
|
||||||
return pickle.loads(data)
|
|
||||||
|
|
||||||
def dump(self):
|
|
||||||
import json
|
|
||||||
output = {}
|
|
||||||
output['map'] = umsgpack.packb(self.pointer.map)
|
|
||||||
return output
|
|
||||||
|
|
||||||
def load(self, target=None):
|
|
||||||
import json
|
|
||||||
if target is None:
|
|
||||||
target = SampleData()
|
|
||||||
|
|
||||||
target.map = umsgpack.unpackb(self.buffer['map'])
|
|
||||||
|
|
||||||
|
|
||||||
class TestDataFactory(unittest.TestCase):
|
|
||||||
def test_data_factory(self):
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
data_sample = SampleData()
|
|
||||||
rep_sample = factory.construct_from_dcc(
|
|
||||||
data_sample)(owner="toto", pointer=data_sample)
|
|
||||||
|
|
||||||
self.assertEqual(isinstance(rep_sample, RepSampleData), True)
|
|
||||||
|
|
||||||
|
|
||||||
class TestClient(unittest.TestCase):
|
|
||||||
def __init__(self, methodName='runTest'):
|
|
||||||
unittest.TestCase.__init__(self, methodName)
|
|
||||||
|
|
||||||
def test_empty_snapshot(self):
|
|
||||||
# Setup
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
|
|
||||||
server = Server(factory=factory)
|
|
||||||
client = Client(factory=factory)
|
|
||||||
|
|
||||||
server.serve(port=5570)
|
|
||||||
client.connect(port=5570, id="client_test_callback")
|
|
||||||
|
|
||||||
test_state = client.state
|
|
||||||
|
|
||||||
server.stop()
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
self.assertNotEqual(test_state, 2)
|
|
||||||
|
|
||||||
def test_filled_snapshot(self):
|
|
||||||
# Setup
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
|
|
||||||
server = Server(factory=factory)
|
|
||||||
client = Client(factory=factory)
|
|
||||||
client2 = Client(factory=factory)
|
|
||||||
|
|
||||||
server.serve(port=5575)
|
|
||||||
client.connect(port=5575,id="cli_test_filled_snapshot")
|
|
||||||
|
|
||||||
# Test the key registering
|
|
||||||
data_sample_key = client.register(SampleData())
|
|
||||||
|
|
||||||
client2.connect(port=5575, id="client_2")
|
|
||||||
time.sleep(0.2)
|
|
||||||
rep_test_key = client2._rep_store[data_sample_key].uuid
|
|
||||||
|
|
||||||
server.stop()
|
|
||||||
client.disconnect()
|
|
||||||
client2.disconnect()
|
|
||||||
|
|
||||||
self.assertEqual(data_sample_key, rep_test_key)
|
|
||||||
|
|
||||||
def test_register_client_data(self):
|
|
||||||
# Setup environment
|
|
||||||
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
|
|
||||||
server = Server(factory=factory)
|
|
||||||
server.serve(port=5580)
|
|
||||||
|
|
||||||
client = Client(factory=factory)
|
|
||||||
client.connect(port=5580,id="cli_test_register_client_data")
|
|
||||||
|
|
||||||
client2 = Client(factory=factory)
|
|
||||||
client2.connect(port=5580, id="cli2_test_register_client_data")
|
|
||||||
|
|
||||||
# Test the key registering
|
|
||||||
data_sample_key = client.register(SampleData())
|
|
||||||
|
|
||||||
time.sleep(0.3)
|
|
||||||
# Waiting for server to receive the datas
|
|
||||||
rep_test_key = client2._rep_store[data_sample_key].uuid
|
|
||||||
|
|
||||||
client.disconnect()
|
|
||||||
client2.disconnect()
|
|
||||||
server.stop()
|
|
||||||
|
|
||||||
self.assertEqual(rep_test_key, data_sample_key)
|
|
||||||
|
|
||||||
def test_client_data_intergity(self):
|
|
||||||
# Setup environment
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
|
|
||||||
server = Server(factory=factory)
|
|
||||||
server.serve(port=5585)
|
|
||||||
|
|
||||||
client = Client(factory=factory)
|
|
||||||
client.connect(port=5585, id="cli_test_client_data_intergity")
|
|
||||||
|
|
||||||
client2 = Client(factory=factory)
|
|
||||||
client2.connect(port=5585, id="cli2_test_client_data_intergity")
|
|
||||||
|
|
||||||
test_map = {"toto": "test"}
|
|
||||||
# Test the key registering
|
|
||||||
data_sample_key = client.register(SampleData(map=test_map))
|
|
||||||
|
|
||||||
test_map_result = SampleData()
|
|
||||||
# Waiting for server to receive the datas
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
client2._rep_store[data_sample_key].load(target=test_map_result)
|
|
||||||
|
|
||||||
client.disconnect()
|
|
||||||
client2.disconnect()
|
|
||||||
server.stop()
|
|
||||||
|
|
||||||
self.assertEqual(test_map_result.map["toto"], test_map["toto"])
|
|
||||||
|
|
||||||
def test_client_unregister_key(self):
|
|
||||||
# Setup environment
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
|
|
||||||
server = Server(factory=factory)
|
|
||||||
server.serve(port=5590)
|
|
||||||
|
|
||||||
client = Client(factory=factory)
|
|
||||||
client.connect(port=5590, id="cli_test_client_data_intergity")
|
|
||||||
|
|
||||||
client2 = Client(factory=factory)
|
|
||||||
client2.connect(port=5590, id="cli2_test_client_data_intergity")
|
|
||||||
|
|
||||||
test_map = {"toto": "test"}
|
|
||||||
# Test the key registering
|
|
||||||
data_sample_key = client.register(SampleData(map=test_map))
|
|
||||||
|
|
||||||
test_map_result = SampleData()
|
|
||||||
|
|
||||||
# Waiting for server to receive the datas
|
|
||||||
time.sleep(.1)
|
|
||||||
|
|
||||||
client2._rep_store[data_sample_key].load(target=test_map_result)
|
|
||||||
|
|
||||||
client.unregister(data_sample_key)
|
|
||||||
time.sleep(.1)
|
|
||||||
|
|
||||||
logger.debug("client store:")
|
|
||||||
logger.debug(client._rep_store)
|
|
||||||
logger.debug("client2 store:")
|
|
||||||
logger.debug(client2._rep_store)
|
|
||||||
logger.debug("server store:")
|
|
||||||
logger.debug(server._rep_store)
|
|
||||||
|
|
||||||
client.disconnect()
|
|
||||||
client2.disconnect()
|
|
||||||
server.stop()
|
|
||||||
|
|
||||||
self.assertFalse(data_sample_key in client._rep_store)
|
|
||||||
|
|
||||||
def test_client_disconnect(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_client_change_rights(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TestStressClient(unittest.TestCase):
|
|
||||||
def test_stress_register(self):
|
|
||||||
total_time = 0
|
|
||||||
# Setup
|
|
||||||
factory = ReplicatedDataFactory()
|
|
||||||
factory.register_type(SampleData, RepSampleData)
|
|
||||||
|
|
||||||
server = Server(factory=factory)
|
|
||||||
client = Client(factory=factory)
|
|
||||||
client2 = Client(factory=factory)
|
|
||||||
|
|
||||||
server.serve(port=5595)
|
|
||||||
client.connect(port=5595,id="cli_test_filled_snapshot")
|
|
||||||
client2.connect(port=5595,id="client_2")
|
|
||||||
|
|
||||||
# Test the key registering
|
|
||||||
for i in range(10000):
|
|
||||||
client.register(SampleData())
|
|
||||||
|
|
||||||
while len(client2._rep_store.keys()) < 10000:
|
|
||||||
time.sleep(0.00001)
|
|
||||||
total_time += 0.00001
|
|
||||||
|
|
||||||
# test_num_items = len(client2._rep_store.keys())
|
|
||||||
server.stop()
|
|
||||||
client.disconnect()
|
|
||||||
client2.disconnect()
|
|
||||||
logger.info("{} s for 10000 values".format(total_time))
|
|
||||||
|
|
||||||
self.assertLess(total_time, 1)
|
|
||||||
|
|
||||||
|
|
||||||
def suite():
|
|
||||||
suite = unittest.TestSuite()
|
|
||||||
|
|
||||||
# Data factory
|
|
||||||
suite.addTest(TestDataFactory('test_data_factory'))
|
|
||||||
|
|
||||||
# Client
|
|
||||||
suite.addTest(TestClient('test_empty_snapshot'))
|
|
||||||
suite.addTest(TestClient('test_filled_snapshot'))
|
|
||||||
suite.addTest(TestClient('test_register_client_data'))
|
|
||||||
suite.addTest(TestClient('test_client_data_intergity'))
|
|
||||||
|
|
||||||
# Stress test
|
|
||||||
suite.addTest(TestStressClient('test_stress_register'))
|
|
||||||
|
|
||||||
return suite
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
runner = unittest.TextTestRunner(verbosity=2)
|
|
||||||
runner.run(suite())
|
|
Reference in New Issue
Block a user