diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 1b853c60..f1dd49d7 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -123,7 +123,6 @@ async def fsp_compute( # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. - # await tractor.breakpoint() fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') # TODO: nptyping here! @@ -269,7 +268,7 @@ async def cascade( f'Registered FSP set:\n{lines}' ) - # update actor local flows table which registers + # update actorlocal flows table which registers # readonly "instances" of this fsp for symbol/source # so that consumer fsps can look it up by source + fsp. # TODO: ugh i hate this wind/unwind to list over the wire @@ -381,14 +380,19 @@ async def cascade( s, step, ld = is_synced(src, dst) + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream() as stream: + async with feed.index_stream(int(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() - async for msg in stream: + async for _ in istream: # respawn the compute task if the source # array has been updated such that we compute