From e0ca5d5200293324e94efac3b221efdb2b398cfe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 4 Jan 2023 23:12:42 -0500 Subject: [PATCH] Use `open_sample_stream()` to increment fsp buffers --- piker/fsp/_engine.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 93bf0388..78448d01 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -39,7 +39,10 @@ from ..data.feed import ( Feed, ) from ..data._sharedmem import ShmArray -from ..data._sampling import _default_delay_s +from ..data._sampling import ( + _default_delay_s, + open_sample_stream, +) from ..data._source import Symbol from ._api import ( Fsp, @@ -336,7 +339,6 @@ async def cascade( symbol = flume.symbol assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') - # last_len = new_len = len(src.array) func_name = func.__name__ async with ( @@ -442,22 +444,21 @@ async def cascade( # signal times = src.array['time'] if len(times) > 1: - delay_s = times[-1] - times[times != times[-1]][-1] + delay_s = float(times[-1] - times[times != times[-1]][-1]) else: # our default "HFT" sample rate. delay_s = _default_delay_s - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - async with flume.index_stream( - int(delay_s) - ) as istream: + # sub and increment the underlying shared memory buffer + # on every step msg received from the global `samplerd` + # service. + async with open_sample_stream(float(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() async for i in istream: - # print(f'FSP incrementing {i}') + print(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute