feat(client): append load to serial agent
This commit is contained in:
57
client.py
57
client.py
@ -13,19 +13,14 @@ import queue
|
||||
# import zmq
|
||||
lock = threading.Lock()
|
||||
|
||||
try:
|
||||
from .libs import umsgpack
|
||||
from .libs import zmq
|
||||
from .libs import dump_anything
|
||||
from . import helpers
|
||||
from . import message
|
||||
except:
|
||||
# Server import
|
||||
from libs import umsgpack
|
||||
from libs import zmq
|
||||
from libs import dump_anything
|
||||
import helpers
|
||||
import message
|
||||
|
||||
from .libs import umsgpack
|
||||
from .libs import zmq
|
||||
from .libs import dump_anything
|
||||
from . import helpers
|
||||
from . import message
|
||||
from . import draw
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@ -76,7 +71,7 @@ class RCFClient(object):
|
||||
|
||||
# Database and connexion agent
|
||||
self.net_agent = threading.Thread(
|
||||
target=rcf_client_agent, args=(self.ctx, self.store, peer, self.serial_product), name="net-agent")
|
||||
target=rcf_client_agent, args=(self.ctx, self.store, peer, self.serial_product,self.serial_feed), name="net-agent")
|
||||
self.net_agent.daemon = True
|
||||
self.net_agent.start()
|
||||
|
||||
@ -122,7 +117,7 @@ class RCFClient(object):
|
||||
self.pipe.send_multipart(
|
||||
[b"SET", umsgpack.packb(key), (umsgpack.packb(value) if value else umsgpack.packb('None')),umsgpack.packb(override)])
|
||||
else:
|
||||
self.serial_feed.put(key)
|
||||
self.serial_feed.put(('DUMP',key,None))
|
||||
# self.serial_pipe.send_multipart(
|
||||
# [b"DUMP", umsgpack.packb(key)])
|
||||
# self.pipe.send_multipart(
|
||||
@ -156,7 +151,7 @@ class RCFClient(object):
|
||||
global stop
|
||||
stop = True
|
||||
for a in range(0,DUMP_AGENTS_NUMBER):
|
||||
self.serial_feed.put('exit')
|
||||
self.serial_feed.put(('exit',None,None))
|
||||
|
||||
|
||||
# READ-ONLY FUNCTIONS
|
||||
@ -377,10 +372,12 @@ class RCFClientAgent(object):
|
||||
elif command == b"STATE":
|
||||
self.pipe.send(umsgpack.packb(self.state.value))
|
||||
|
||||
def rcf_client_agent(ctx,store, pipe, queue):
|
||||
|
||||
def rcf_client_agent(ctx,store, pipe, serial_product, serial_feed):
|
||||
agent = RCFClientAgent(ctx,store, pipe)
|
||||
server = None
|
||||
net_feed = queue
|
||||
net_feed = serial_product
|
||||
net_product = serial_feed
|
||||
global stop
|
||||
while True:
|
||||
if stop:
|
||||
@ -434,15 +431,17 @@ def rcf_client_agent(ctx,store, pipe, queue):
|
||||
logger.info("snapshot complete")
|
||||
agent.state = State.ACTIVE
|
||||
else:
|
||||
helpers.load(rcfmsg.key, rcfmsg.body)
|
||||
net_product.put(('LOAD',rcfmsg.key, rcfmsg.body))
|
||||
# helpers.load(rcfmsg.key, rcfmsg.body)
|
||||
rcfmsg.store(agent.property_map)
|
||||
logger.info("snapshot from {} stored".format(rcfmsg.id))
|
||||
elif agent.state == State.ACTIVE:
|
||||
# IN
|
||||
if rcfmsg.id != agent.id:
|
||||
|
||||
with lock:
|
||||
helpers.load(rcfmsg.key, rcfmsg.body)
|
||||
# with lock:
|
||||
# helpers.load(rcfmsg.key, rcfmsg.body)
|
||||
net_product.put(('LOAD',rcfmsg.key, rcfmsg.body))
|
||||
rcfmsg.store(agent.property_map)
|
||||
|
||||
else:
|
||||
@ -473,15 +472,19 @@ def rcf_client_agent(ctx,store, pipe, queue):
|
||||
# else: else
|
||||
# agent.state = State.INITIAL
|
||||
|
||||
|
||||
def dumper_agent(product, feed):
|
||||
while True:
|
||||
key = feed.get()
|
||||
command,key,value = feed.get()
|
||||
|
||||
if key == 'exit':
|
||||
if command == 'exit':
|
||||
break
|
||||
elif command == 'DUMP':
|
||||
value = helpers.dump(key)
|
||||
|
||||
value = helpers.dump(key)
|
||||
|
||||
if value:
|
||||
product.put((key,value))
|
||||
if value:
|
||||
product.put((key,value))
|
||||
elif command == 'LOAD':
|
||||
if value:
|
||||
helpers.load(key, value)
|
||||
|
Reference in New Issue
Block a user