diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index d9f3af26..5ba3d376 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,6 +37,7 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray +from ..data._sampling import _default_delay_s from ..data._source import Symbol from ._api import ( Fsp, @@ -105,7 +106,7 @@ async def fsp_compute( filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg - feed.shm, + feed.rt_shm, ) # Conduct a single iteration of fsp with historical bars input @@ -313,7 +314,7 @@ async def cascade( profiler(f'{func}: feed up') - assert src.token == feed.shm.token + assert src.token == feed.rt_shm.token # last_len = new_len = len(src.array) func_name = func.__name__ @@ -420,7 +421,11 @@ async def cascade( # detect sample period step for subscription to increment # signal times = src.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + if len(times) > 1: + delay_s = times[-1] - times[times != times[-1]][-1] + else: + # our default "HFT" sample rate. + delay_s = _default_delay_s # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -431,7 +436,8 @@ async def cascade( profiler(f'{func_name}: sample stream up') profiler.finish() - async for _ in istream: + async for i in istream: + # log.runtime(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute