feat: sync test
This commit is contained in:
49
client.py
49
client.py
@ -178,6 +178,7 @@ class RCFClientAgent(object):
|
||||
self.publisher.setsockopt(zmq.SNDHWM, 60)
|
||||
self.publisher.linger = 0
|
||||
self.serial, peer = zpipe(self.ctx)
|
||||
self.updates = queue.Queue()
|
||||
# self.serial_agent = threading.Thread(
|
||||
# target=serialization_agent, args=(self.ctx, peer), name="serial-agent")
|
||||
# self.serial_agent.daemon = True
|
||||
@ -337,6 +338,7 @@ def rcf_client_agent(ctx, pipe, queue):
|
||||
rcfmsg.store(agent.property_map)
|
||||
logger.info("snapshot from {} stored".format(rcfmsg.id))
|
||||
elif agent.state == State.ACTIVE:
|
||||
# IN
|
||||
if rcfmsg.id != agent.id:
|
||||
# update_queue.put((rcfmsg.key,rcfmsg.body))
|
||||
|
||||
@ -363,6 +365,24 @@ def rcf_client_agent(ctx, pipe, queue):
|
||||
else:
|
||||
logger.debug("{} nothing to do".format(agent.id))
|
||||
|
||||
# LOCAL SYNC
|
||||
if not update_queue.empty():
|
||||
key = update_queue.get()
|
||||
|
||||
value = helpers.dump(key)
|
||||
value['id'] = agent.id.decode()
|
||||
if value:
|
||||
rcfmsg = message.RCFMessage(
|
||||
key=key, id=agent.id, body=value)
|
||||
|
||||
rcfmsg.store(agent.property_map)
|
||||
rcfmsg.send(agent.publisher)
|
||||
else:
|
||||
logger.error("Fail to dump ")
|
||||
|
||||
|
||||
|
||||
|
||||
logger.info("exit thread")
|
||||
stop = False
|
||||
# else: else
|
||||
@ -424,37 +444,20 @@ class RCFSyncAgent(object):
|
||||
ctx = None
|
||||
pipe = None
|
||||
|
||||
def __init__(self, ctx, pipe):
|
||||
self.ctx = ctx
|
||||
self.pipe = pipe
|
||||
logger.info("serialisation service launched")
|
||||
def __init__(self, feed):
|
||||
self.feed = feed
|
||||
logger.info("sync service launched")
|
||||
|
||||
def control_message(self):
|
||||
msg = self.pipe.recv_multipart()
|
||||
command = msg.pop(0)
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def rcf_sync_agent(ctx, pipe):
|
||||
agent = RCFSyncAgent(ctx, pipe)
|
||||
|
||||
def rcf_sync_agent(ctx, feed):
|
||||
agent = RCFSyncAgent(feed)
|
||||
|
||||
global stop
|
||||
while True:
|
||||
if stop:
|
||||
break
|
||||
|
||||
poller = zmq.Poller()
|
||||
poller.register(agent.pipe, zmq.POLLIN)
|
||||
|
||||
try:
|
||||
items = dict(poller.poll(1))
|
||||
except:
|
||||
raise
|
||||
break
|
||||
|
||||
if agent.pipe in items:
|
||||
agent.control_message()
|
||||
|
||||
# Synchronisation
|
||||
|
||||
|
15
operators.py
15
operators.py
@ -194,11 +194,24 @@ def draw_tick():
|
||||
|
||||
return .2
|
||||
|
||||
def sync():
|
||||
global client_instance
|
||||
|
||||
if client_instance:
|
||||
for datatype in SUPPORTED_TYPES:
|
||||
for item in getattr(bpy.data, helpers.CORRESPONDANCE[datatype]):
|
||||
if item.id == 'None':
|
||||
item.id= bpy.context.scene.session_settings.username
|
||||
key = "{}/{}".format(datatype, item.name)
|
||||
client_instance.queue.put(key)
|
||||
|
||||
|
||||
return .2
|
||||
|
||||
def register_ticks():
|
||||
# REGISTER Updaters
|
||||
bpy.app.timers.register(draw_tick)
|
||||
|
||||
bpy.app.timers.register(sync)
|
||||
bpy.app.timers.register(default_tick)
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user