diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index afd6aa2c..cb51a86f 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -44,6 +44,11 @@ _fsp_builtins = { 'vwap': _tina_vwap, } +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + @dataclass class TaskTracker: @@ -54,10 +59,11 @@ class TaskTracker: async def filter_quotes_by_sym( sym: str, - quote_stream, + quote_stream: tractor.MsgStream, ) -> AsyncIterator[dict]: - '''Filter quote stream by target symbol. + ''' + Filter quote stream by target symbol. ''' # TODO: make this the actual first quote from feed @@ -187,8 +193,9 @@ async def cascade( loglevel: Optional[str] = None, ) -> None: - '''Chain streaming signal processors and deliver output to - destination mem buf. + ''' + Chain streaming signal processors and deliver output to + destination shm array buffer. ''' profiler = pg.debug.Profiler(delayed=False, disabled=False) @@ -260,6 +267,46 @@ async def cascade( await tracker.complete.wait() return await n.start(fsp_target) + def is_synced( + src: ShmArray, + dst: ShmArray + ) -> tuple[bool, int, int]: + '''Predicate to dertmine if a destination FSP + output array is aligned to its source array. + + ''' + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or + + # we aren't step synced to the source and may be + # leading/lagging by a step + step_diff > 1 or + step_diff < 0 + ), step_diff, len_diff + + async def poll_and_sync_to_step( + + tracker: TaskTracker, + src: ShmArray, + dst: ShmArray, + + ) -> tuple[TaskTracker, int]: + + synced, step_diff, _ = is_synced(src, dst) + while not synced: + tracker, index = await resync(tracker) + synced, step_diff, _ = is_synced(src, dst) + + return tracker, step_diff + + s, step, ld = is_synced(src, dst) + if step or ld: + await tractor.breakpoint() + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. async with feed.index_stream() as stream: @@ -269,41 +316,20 @@ async def cascade( async for msg in stream: - def desynced(src: ShmArray, dst: ShmArray) -> bool: - diff = src.index - dst.index - len_diff = abs(len(src.array) - len(dst.array)) - return ( - # the source is likely backfilling and we must - # sync history calculations - len_diff > 2 or - - # we aren't step synced to the source and may be - # leading/lagging by a step - diff > 1 or - diff < 0 - ) - - async def poll_and_sync_to_step(tracker): - while desynced(src, dst): - tracker, index = await resync(tracker) - # log.info( - # '\n'.join(( - # f'history index after sync: {index}', - # f'diff after sync: {diff}', - # )) - # ) - - return tracker, diff - # respawn the compute task if the source # array has been updated such that we compute # new history from the (prepended) source. - if desynced(src, dst): - tracker, diff = await poll_and_sync_to_step(tracker) + synced, step_diff, _ = is_synced(src, dst) + if not synced: + tracker, step_diff = await poll_and_sync_to_step( + tracker, + src, + dst, + ) - # skip adding a last bar since we should be - # source alinged - if diff == 0: + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: continue # read out last shm row, copy and write new row