feat: added a push timer to control the push frequency

This commit is contained in:
Swann
2021-01-11 19:54:57 +01:00
parent 3c31fb5118
commit 12bd4a603b
3 changed files with 44 additions and 47 deletions

View File

@ -44,7 +44,7 @@ from . import environment
DEPENDENCIES = { DEPENDENCIES = {
("replication", '0.1.20'), ("replication", '0.1.21'),
} }

View File

@ -53,6 +53,8 @@ from .presence import SessionStatusWidget, renderer, view3d_find
from .timers import registry from .timers import registry
background_execution_queue = Queue() background_execution_queue = Queue()
stagging = list()
deleyables = [] deleyables = []
stop_modal_executor = False stop_modal_executor = False
@ -79,13 +81,13 @@ def initialize_session():
# Step 1: Constrect nodes # Step 1: Constrect nodes
for node in session._graph.list_ordered(): for node in session._graph.list_ordered():
node_ref = session.get(node) node_ref = session.get(uuid=node)
if node_ref.state == FETCHED: if node_ref.state == FETCHED:
node_ref.resolve() node_ref.resolve()
# Step 2: Load nodes # Step 2: Load nodes
for node in session._graph.list_ordered(): for node in session._graph.list_ordered():
node_ref = session.get(node) node_ref = session.get(uuid=node)
if node_ref.state == FETCHED: if node_ref.state == FETCHED:
node_ref.apply() node_ref.apply()
@ -118,6 +120,7 @@ def on_connection_end(reason="none"):
stop_modal_executor = True stop_modal_executor = True
if depsgraph_evaluation in bpy.app.handlers.depsgraph_update_post:
bpy.app.handlers.depsgraph_update_post.remove(depsgraph_evaluation) bpy.app.handlers.depsgraph_update_post.remove(depsgraph_evaluation)
# Step 3: remove file handled # Step 3: remove file handled
@ -266,7 +269,7 @@ class SessionStartOperator(bpy.types.Operator):
# Background client updates service # Background client updates service
deleyables.append(timers.ClientUpdate()) deleyables.append(timers.ClientUpdate())
deleyables.append(timers.DynamicRightSelectTimer()) deleyables.append(timers.DynamicRightSelectTimer())
deleyables.append(timers.PushTimer(queue=stagging))
session_update = timers.SessionStatusUpdate() session_update = timers.SessionStatusUpdate()
session_user_sync = timers.SessionUserSync() session_user_sync = timers.SessionUserSync()
session_background_executor = timers.MainThreadExecutor( session_background_executor = timers.MainThreadExecutor(
@ -921,14 +924,11 @@ def sanitize_deps_graph(dummy):
""" """
if session and session.state['STATE'] == STATE_ACTIVE: if session and session.state['STATE'] == STATE_ACTIVE:
# pass
# session.lock_operations()
start = utils.current_milli_time() start = utils.current_milli_time()
for node_key in session.list(): for node_key in session.list():
node = session.get(node_key) node = session.get(node_key)
node.resolve(construct=False) node.resolve(construct=False)
logging.debug(f"Sanitize took { utils.current_milli_time()-start}ms") logging.debug(f"Sanitize took { utils.current_milli_time()-start}ms")
# session.unlock_operations()
@ -960,31 +960,21 @@ def depsgraph_evaluation(scene):
# Is the object tracked ? # Is the object tracked ?
if update.id.uuid: if update.id.uuid:
# Retrieve local version # Retrieve local version
node = session.get(update.id.uuid) node = session.get(uuid=update.id.uuid)
# Check our right on this update: # Check our right on this update:
# - if its ours or ( under common and diff), launch the # - if its ours or ( under common and diff), launch the
# update process # update process
# - if its to someone else, ignore the update (go deeper ?) # - if its to someone else, ignore the update (go deeper ?)
if node and node.owner in [session.id, RP_COMMON] and node.state == UP: if node and node.owner in [session.id, RP_COMMON]:
if node.state == UP:
# Avoid slow geometry update # Avoid slow geometry update
if 'EDIT' in context.mode and \ if 'EDIT' in context.mode and \
not settings.sync_flags.sync_during_editmode: not settings.sync_flags.sync_during_editmode:
break break
# session.stash(node.uuid) if node.uuid not in stagging:
if node.has_changed(): stagging.append(node.uuid)
try:
session.commit(node.uuid)
session.push(node.uuid)
except ReferenceError:
logging.debug(f"Reference error {node.uuid}")
if not node.is_valid():
session.remove(node.uuid)
except ContextError as e:
logging.debug(e)
except Exception as e:
logging.error(e)
else: else:
# Distant update # Distant update
continue continue
@ -992,16 +982,6 @@ def depsgraph_evaluation(scene):
# # New items ! # # New items !
# logger.error("UPDATE: ADD") # logger.error("UPDATE: ADD")
@persistent
def unlock(dummy):
if session and session.state['STATE'] == STATE_ACTIVE:
session.unlock_operations()
@persistent
def lock(dummy):
if session and session.state['STATE'] == STATE_ACTIVE:
session.lock_operations()
def register(): def register():
from bpy.utils import register_class from bpy.utils import register_class
@ -1009,11 +989,6 @@ def register():
for cls in classes: for cls in classes:
register_class(cls) register_class(cls)
bpy.app.handlers.undo_post.append(unlock)
bpy.app.handlers.undo_pre.append(lock)
bpy.app.handlers.redo_pre.append(unlock)
bpy.app.handlers.redo_post.append(lock)
bpy.app.handlers.undo_post.append(sanitize_deps_graph) bpy.app.handlers.undo_post.append(sanitize_deps_graph)
bpy.app.handlers.redo_post.append(sanitize_deps_graph) bpy.app.handlers.redo_post.append(sanitize_deps_graph)
@ -1029,10 +1004,6 @@ def unregister():
for cls in reversed(classes): for cls in reversed(classes):
unregister_class(cls) unregister_class(cls)
bpy.app.handlers.undo_post.remove(unlock)
bpy.app.handlers.undo_pre.remove(lock)
bpy.app.handlers.redo_pre.remove(unlock)
bpy.app.handlers.redo_post.remove(lock)
bpy.app.handlers.undo_post.remove(sanitize_deps_graph) bpy.app.handlers.undo_post.remove(sanitize_deps_graph)
bpy.app.handlers.redo_post.remove(sanitize_deps_graph) bpy.app.handlers.redo_post.remove(sanitize_deps_graph)

View File

@ -22,7 +22,7 @@ import bpy
from replication.constants import (FETCHED, RP_COMMON, STATE_ACTIVE, from replication.constants import (FETCHED, RP_COMMON, STATE_ACTIVE,
STATE_INITIAL, STATE_LOBBY, STATE_QUITTING, STATE_INITIAL, STATE_LOBBY, STATE_QUITTING,
STATE_SRV_SYNC, STATE_SYNCING, UP) STATE_SRV_SYNC, STATE_SYNCING, UP)
from replication.exception import NonAuthorizedOperationError from replication.exception import NonAuthorizedOperationError, ContextError
from replication.interface import session from replication.interface import session
from . import operators, utils from . import operators, utils
@ -130,6 +130,32 @@ class ApplyTimer(Timer):
if deps and node in deps: if deps and node in deps:
session.apply(n, force=True) session.apply(n, force=True)
class PushTimer(Timer):
def __init__(self, timeout=1, queue=None):
super().__init__(timeout)
self.id = "PushTimer"
self.q_push = queue
def execute(self):
while self.q_push:
node_id = self.q_push.pop()
node = session.get(uuid=node_id)
if node.has_changed():
try:
session.commit(node.uuid)
session.push(node.uuid)
except ReferenceError:
logging.debug(f"Reference error {node.uuid}")
if not node.is_valid():
session.remove(node.uuid)
except ContextError as e:
logging.debug(e)
except Exception as e:
logging.error(e)
else:
logging.info("Skipping updatem no changes")
class DynamicRightSelectTimer(Timer): class DynamicRightSelectTimer(Timer):
def __init__(self, timeout=.1): def __init__(self, timeout=.1):
super().__init__(timeout) super().__init__(timeout)