From cda6e2fab4583410d3ab25b016513f63dc240771 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Apr 2022 23:28:30 -0400 Subject: [PATCH] Establish stream before `fsp_compute` so that backfill updates work again.. --- piker/fsp/_engine.py | 218 ++++++++++++++++++++++++------------------- 1 file changed, 120 insertions(+), 98 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index e678ada0..0776c7a2 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -76,7 +76,6 @@ async def filter_quotes_by_sym( async def fsp_compute( - ctx: tractor.Context, symbol: Symbol, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -86,7 +85,7 @@ async def fsp_compute( func: Callable, - attach_stream: bool = False, + # attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -193,46 +192,47 @@ async def fsp_compute( profiler(f'{func_name} pushed history') profiler.finish() - # TODO: UGH, what is the right way to do something like this? - if not ctx._started_called: - await ctx.started(index) - # setup a respawn handle with trio.CancelScope() as cs: + + # TODO: might be better to just make a "restart" method where + # the target task is spawned implicitly and then the event is + # set via some higher level api? At that poing we might as well + # be writing a one-cancels-one nursery though right? tracker = TaskTracker(trio.Event(), cs) task_status.started((tracker, index)) + profiler(f'{func_name} yield last index') # import time # last = time.time() try: - # rt stream - async with ctx.open_stream() as stream: - # always trigger UI refresh after history update, - # see ``piker.ui._fsp.FspAdmin.open_chain()`` and - # ``piker.ui._display.trigger_update()``. - await stream.send('update') + async for processed in out_stream: - async for processed in out_stream: + log.debug(f"{func_name}: {processed}") + key, output = processed + index = src.index + dst.array[-1][key] = output - log.debug(f"{func_name}: {processed}") - key, output = processed - index = src.index - dst.array[-1][key] = output + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + # TODO: further this should likely be implemented much + # like our `Feed` api where there is one background + # "service" task which computes output and then sends to + # N-consumers who subscribe for the real-time output, + # which we'll likely want to implement using local-mem + # chans for the fan out? + # if attach_stream: + # await client_stream.send(index) - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) - - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() finally: tracker.complete.set() @@ -323,7 +323,6 @@ async def cascade( fsp_target = partial( fsp_compute, - ctx=ctx, symbol=symbol, feed=feed, quote_stream=quote_stream, @@ -332,7 +331,7 @@ async def cascade( src=src, dst=dst, - # func_name=func_name, + # target func=func ) @@ -344,90 +343,113 @@ async def cascade( profiler(f'{func_name}: fsp up') - async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - log.warning(f're-syncing fsp {func_name} to source') - tracker.cs.cancel() - await tracker.complete.wait() - return await n.start(fsp_target) + # sync client + await ctx.started(index) - def is_synced( - src: ShmArray, - dst: ShmArray - ) -> tuple[bool, int, int]: - '''Predicate to dertmine if a destination FSP - output array is aligned to its source array. + # XXX: rt stream with client which we MUST + # open here (and keep it open) in order to make + # incremental "updates" as history prepends take + # place. + async with ctx.open_stream() as client_stream: - ''' - step_diff = src.index - dst.index - len_diff = abs(len(src.array) - len(dst.array)) - return not ( - # the source is likely backfilling and we must - # sync history calculations - len_diff > 2 or + # TODO: these likely should all become + # methods of this ``TaskLifetime`` or wtv + # abstraction.. + async def resync( + tracker: TaskTracker, - # we aren't step synced to the source and may be - # leading/lagging by a step - step_diff > 1 or - step_diff < 0 - ), step_diff, len_diff + ) -> tuple[TaskTracker, int]: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + tracker, index = await n.start(fsp_target) - async def poll_and_sync_to_step( + # always trigger UI refresh after history update, + # see ``piker.ui._fsp.FspAdmin.open_chain()`` and + # ``piker.ui._display.trigger_update()``. + await client_stream.send('update') + return tracker, index - tracker: TaskTracker, - src: ShmArray, - dst: ShmArray, + def is_synced( + src: ShmArray, + dst: ShmArray + ) -> tuple[bool, int, int]: + '''Predicate to dertmine if a destination FSP + output array is aligned to its source array. - ) -> tuple[TaskTracker, int]: + ''' + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or + + # we aren't step synced to the source and may be + # leading/lagging by a step + step_diff > 1 or + step_diff < 0 + ), step_diff, len_diff + + async def poll_and_sync_to_step( + + tracker: TaskTracker, + src: ShmArray, + dst: ShmArray, + + ) -> tuple[TaskTracker, int]: - synced, step_diff, _ = is_synced(src, dst) - while not synced: - tracker, index = await resync(tracker) synced, step_diff, _ = is_synced(src, dst) + while not synced: + tracker, index = await resync(tracker) + synced, step_diff, _ = is_synced(src, dst) - return tracker, step_diff + return tracker, step_diff - s, step, ld = is_synced(src, dst) + s, step, ld = is_synced(src, dst) - # detect sample period step for subscription to increment - # signal - times = src.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - async with feed.index_stream(int(delay_s)) as istream: + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. + async with feed.index_stream( + int(delay_s) + ) as istream: - profiler(f'{func_name}: sample stream up') - profiler.finish() + profiler(f'{func_name}: sample stream up') + profiler.finish() - async for _ in istream: + async for _ in istream: - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - synced, step_diff, _ = is_synced(src, dst) - if not synced: - tracker, step_diff = await poll_and_sync_to_step( - tracker, - src, - dst, - ) + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + synced, step_diff, _ = is_synced(src, dst) + if not synced: + tracker, step_diff = await poll_and_sync_to_step( + tracker, + src, + dst, + ) - # skip adding a last bar since we should already - # be step alinged - if step_diff == 0: - continue + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: + continue - # read out last shm row, copy and write new row - array = dst.array + # read out last shm row, copy and write new row + array = dst.array - # some metrics like vlm should be reset - # to zero every step. - if zero_on_step: - last = zeroed - else: - last = array[-1:].copy() + # some metrics like vlm should be reset + # to zero every step. + if zero_on_step: + last = zeroed + else: + last = array[-1:].copy() - dst.push(last) + dst.push(last)