From f79c3617d6c88dd918876335c834f235900e81f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 30 Aug 2022 10:55:11 -0400 Subject: [PATCH] Always load FSPs with the default (fast) sampling period --- piker/fsp/_engine.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index d9f3af26..5ba3d376 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,6 +37,7 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray +from ..data._sampling import _default_delay_s from ..data._source import Symbol from ._api import ( Fsp, @@ -105,7 +106,7 @@ async def fsp_compute( filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg - feed.shm, + feed.rt_shm, ) # Conduct a single iteration of fsp with historical bars input @@ -313,7 +314,7 @@ async def cascade( profiler(f'{func}: feed up') - assert src.token == feed.shm.token + assert src.token == feed.rt_shm.token # last_len = new_len = len(src.array) func_name = func.__name__ @@ -420,7 +421,11 @@ async def cascade( # detect sample period step for subscription to increment # signal times = src.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + if len(times) > 1: + delay_s = times[-1] - times[times != times[-1]][-1] + else: + # our default "HFT" sample rate. + delay_s = _default_delay_s # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -431,7 +436,8 @@ async def cascade( profiler(f'{func_name}: sample stream up') profiler.finish() - async for _ in istream: + async for i in istream: + # log.runtime(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute