feat(rcf): update ui, append data load

This commit is contained in:
Swann Martinez
2019-03-25 14:56:09 +01:00
parent 704ea35129
commit a7f712e824
3 changed files with 128 additions and 155 deletions

View File

@ -1,41 +1,39 @@
# import zmq
import asyncio
import logging
from .libs.esper import esper
from .libs import zmq
from .libs import umsgpack
import time
import random
import struct
import collections
import logging
import time
from enum import Enum
from .libs import umsgpack, zmq
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
CONNECT_TIMEOUT = 2
CONNECT_TIMEOUT = 2
WAITING_TIME = 0.001
class RCFStatus(Enum):
IDLE = 1
CONNECTING = 2
CONNECTED = 3
class RCFFactory(object):
"""
Abstract layer used to bridge external and inter
"""
def init(self, data):
"""
set the RCFMessage pointer to local data
"""
print("Default setter")
#Setup data accessor
# Setup data accessor
data.get = self.load_getter(data)
data.set = self.load_setter(data)
# TODO: Setup local pointer
def load_getter(self, data):
"""
@ -52,7 +50,7 @@ class RCFFactory(object):
print("Default setter")
return None
def apply(self,data):
def apply(self, data):
pass
def diff(self, data):
@ -61,26 +59,31 @@ class RCFFactory(object):
"""
pass
class RCFStore(collections.MutableMapping,dict):
class RCFStore(collections.MutableMapping, dict):
def __init__(self, custom_factory=RCFFactory()):
super().__init__()
super().__init__()
self.factory = custom_factory
def __getitem__(self,key):
return dict.__getitem__(self,key)
def __getitem__(self, key):
return dict.__getitem__(self, key)
def __setitem__(self, key, value):
dict.__setitem__(self,key,value)
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self,key)
dict.__delitem__(self, key)
def __iter__(self):
return dict.__iter__(self)
def __len__(self):
return dict.__len__(self)
def __contains__(self, x):
return dict.__contains__(self,x)
return dict.__contains__(self, x)
class RCFMessage(object):
"""
@ -101,7 +104,7 @@ class RCFMessage(object):
self.mtype = mtype
self.body = body
self.id = id
def store(self, dikt):
"""Store me in a dict if I have anything to store"""
# this currently erasing old value
@ -109,7 +112,6 @@ class RCFMessage(object):
dikt[self.key] = self
elif self.key in dikt:
del dikt[self.key]
def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
@ -148,16 +150,17 @@ class RCFMessage(object):
data=data,
))
class Client():
class RCFClient():
def __init__(
self,
context=zmq.Context(),
id="default",
on_recv=None,
on_post_init=None,
is_admin=False,
factory=None,
address="localhost"):
self,
context=zmq.Context(),
id="default",
on_recv=None,
on_post_init=None,
is_admin=False,
factory=None,
address="localhost"):
# 0MQ vars
self.context = context
@ -165,7 +168,7 @@ class Client():
self.req_sock = None
self.poller = None
# Client configuration
# Client configuration
self.id = id.encode()
self.on_recv = on_recv
self.on_post_init = on_post_init
@ -233,16 +236,18 @@ class Client():
for f in self.on_post_init:
f()
logger.info("{} client running".format(id))
self.push_update("net/clients/{}".format(self.id.decode()),"client",self.id)
self.push_update("net/objects/{}".format(self.id.decode()),"client_object","None")
self.push_update(
"net/clients/{}".format(self.id.decode()), "client", self.id)
self.push_update(
"net/objects/{}".format(self.id.decode()), "client_object", "None")
self.tick_task = asyncio.ensure_future(self.tick())
self.status = RCFStatus.CONNECTED
async def tick(self):
# Main loop
while True:
@ -270,9 +275,12 @@ class Client():
self.push_sock.close()
self.pull_sock.close()
self.load_task.cancel()
self.tick_task.cancel()
class Server():
if self.tick_task:
self.tick_task.cancel()
class RCFServer():
def __init__(self, context=zmq.Context(), id="admin"):
self.context = context
@ -281,11 +289,11 @@ class Server():
self.collector_sock = None
self.poller = None
self.property_map = RCFStore()
self.property_map = RCFStore()
self.id = id
self.bind_ports()
# Main client loop registration
self.task = asyncio.ensure_future(self.main())
self.task = asyncio.ensure_future(self.tick())
logger.info("{} client initialized".format(id))
@ -312,11 +320,11 @@ class Server():
self.poller.register(self.request_sock, zmq.POLLIN)
self.poller.register(self.collector_sock, zmq.POLLIN)
async def main(self):
async def tick(self):
logger.info("{} server launched".format(id))
while True:
# Non blocking poller
# Non blocking poller
socks = dict(self.poller.poll(1))
# Snapshot system for late join (Server - Client)
@ -350,7 +358,7 @@ class Server():
msg.store(self.property_map)
msg.send(self.pub_sock)
else:
await asyncio.sleep(0.0001)
await asyncio.sleep(WAITING_TIME)
def stop(self):
logger.debug("Stopping server")
@ -360,4 +368,4 @@ class Server():
self.collector_sock.close()
self.task.cancel()
self.status= RCFStatus.IDLE
self.status = RCFStatus.IDLE