diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 93bf0388..78448d01 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -39,7 +39,10 @@ from ..data.feed import ( Feed, ) from ..data._sharedmem import ShmArray -from ..data._sampling import _default_delay_s +from ..data._sampling import ( + _default_delay_s, + open_sample_stream, +) from ..data._source import Symbol from ._api import ( Fsp, @@ -336,7 +339,6 @@ async def cascade( symbol = flume.symbol assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') - # last_len = new_len = len(src.array) func_name = func.__name__ async with ( @@ -442,22 +444,21 @@ async def cascade( # signal times = src.array['time'] if len(times) > 1: - delay_s = times[-1] - times[times != times[-1]][-1] + delay_s = float(times[-1] - times[times != times[-1]][-1]) else: # our default "HFT" sample rate. delay_s = _default_delay_s - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - async with flume.index_stream( - int(delay_s) - ) as istream: + # sub and increment the underlying shared memory buffer + # on every step msg received from the global `samplerd` + # service. + async with open_sample_stream(float(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() async for i in istream: - # print(f'FSP incrementing {i}') + print(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute