From ea9b66d1c3f02cbcdcfffb3e2eefb8385418a8dd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 09:33:48 -0400 Subject: [PATCH 1/2] Hotfix: open a tractor context to fsps... The prior PR for fixing fsp array misalignment also added `tractor.Context` usage which wasn't reflected in the graphics update loop (newer code added it but the prior PR was factored from path dependent history) and thus was broken. Further in newer work we don't have fsp actors actually stream value updates since the display loop can already pull from the source feed and update graphics at a preferred throttle rate. Re-enabled the fsp stream sending here by default until that newer only-throttle-pull-from-source code is landed in the display loop. --- piker/fsp/_engine.py | 2 +- piker/ui/_display.py | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 883f5853..c6e9299d 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -90,7 +90,7 @@ async def fsp_compute( func_name: str, func: Callable, - attach_stream: bool = False, + attach_stream: bool = True, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index ab85e761..7859d23b 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -427,7 +427,7 @@ async def run_fsp( ) async with ( - portal.open_stream_from( + portal.open_context( # chaining entrypoint fsp.cascade, @@ -437,21 +437,17 @@ async def run_fsp( src_shm_token=src_shm.token, dst_shm_token=conf['shm'].token, symbol=sym, - fsp_func_name=fsp_func_name, + func_name=fsp_func_name, loglevel=loglevel, - ) as stream, - + ) as (ctx, last_index), + ctx.open_stream() as stream, open_sidepane( linkedsplits, display_name, ) as sidepane, ): - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - shm = conf['shm'] if conf.get('overlay'): From 2b97f981512e8a233ba4c2ae27e27849b8b8d2f2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 10:04:10 -0400 Subject: [PATCH 2/2] Don't open stream before starting the fsp context.. --- piker/fsp/_engine.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index c6e9299d..dda27ccf 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -79,7 +79,7 @@ async def filter_quotes_by_sym( async def fsp_compute( - stream: tractor.MsgStream, + ctx: tractor.Context, symbol: str, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -147,6 +147,7 @@ async def fsp_compute( # setup a respawn handle with trio.CancelScope() as cs: tracker = TaskTracker(trio.Event(), cs) + await ctx.started(index) task_status.started((tracker, index)) profiler(f'{func_name} yield last index') @@ -155,23 +156,24 @@ async def fsp_compute( try: # rt stream - async for processed in out_stream: + async with ctx.open_stream() as stream: + async for processed in out_stream: - log.debug(f"{func_name}: {processed}") - index = src.index - dst.array[-1][func_name] = processed + log.debug(f"{func_name}: {processed}") + index = src.index + dst.array[-1][func_name] = processed - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + if attach_stream: + await stream.send(index) - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() finally: tracker.complete.set() @@ -229,14 +231,13 @@ async def cascade( # last_len = new_len = len(src.array) async with ( - ctx.open_stream() as stream, trio.open_nursery() as n, ): fsp_target = partial( fsp_compute, - stream=stream, + ctx=ctx, symbol=symbol, feed=feed, quote_stream=quote_stream, @@ -255,7 +256,6 @@ async def cascade( last = dst.array[-1:] zeroed = np.zeros(last.shape, dtype=last.dtype) - await ctx.started(index) profiler(f'{func_name}: fsp up') async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: