From d1b07c625fc5ac474964e25eb988b4f829280cee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Nov 2022 15:08:36 -0500 Subject: [PATCH] Copy timestamps from source to FSP dest buffer Slice up to history's length worth of (latest) time stamps from source series read at the start of the history init phase. --- piker/fsp/_engine.py | 25 ++++++++++++++++--------- piker/fsp/_volume.py | 3 +-- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index edef7219..357934d8 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -112,8 +112,9 @@ async def fsp_compute( flume.rt_shm, ) - # Conduct a single iteration of fsp with historical bars input - # and get historical output + # HISTORY COMPUTE PHASE + # conduct a single iteration of fsp with historical bars input + # and get historical output, pack into history_output: Union[ dict[str, np.ndarray], # multi-output case np.ndarray, # single output case @@ -130,7 +131,8 @@ async def fsp_compute( # each respective field. fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') - history: Optional[np.ndarray] = None # TODO: nptyping here! + history_by_field: Optional[np.ndarray] = None + src_time = src.array['time'] if ( fields and @@ -146,7 +148,7 @@ async def fsp_compute( if key in history_output: output = history_output[key] - if history is None: + if history_by_field is None: if output is None: length = len(src.array) @@ -156,7 +158,7 @@ async def fsp_compute( # using the first output, determine # the length of the struct-array that # will be pushed to shm. - history = np.zeros( + history_by_field = np.zeros( length, dtype=dst.array.dtype ) @@ -164,7 +166,7 @@ async def fsp_compute( if output is None: continue - history[key] = output + history_by_field[key] = output # single-key output stream else: @@ -173,11 +175,13 @@ async def fsp_compute( f'`{func_name}` is a single output FSP and should yield an ' '`np.ndarray` for history' ) - history = np.zeros( + history_by_field = np.zeros( len(history_output), dtype=dst.array.dtype ) - history[func_name] = history_output + history_by_field[func_name] = history_output + + history_by_field['time'] = src_time[-len(history_by_field):] # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're @@ -194,7 +198,10 @@ async def fsp_compute( # TODO: can we use this `start` flag instead of the manual # setting above? - index = dst.push(history, start=first) + index = dst.push( + history_by_field, + start=first, + ) profiler(f'{func_name} pushed history') profiler.finish() diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index b5456fac..b998c67b 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -268,8 +268,7 @@ async def flow_rates( 'dark_dvlm_rate': None, } - # TODO: 3.10 do ``anext()`` - quote = await source.__anext__() + quote = await anext(source) # ltr = 0 # lvr = 0