diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index afd986e0..1b601c35 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -20,7 +20,10 @@ core task logic for processing chains ''' from dataclasses import dataclass from functools import partial -from typing import AsyncIterator, Callable, Optional +from typing import ( + AsyncIterator, Callable, Optional, + Union, +) import numpy as np import pyqtgraph as pg @@ -101,28 +104,61 @@ async def fsp_compute( # Conduct a single iteration of fsp with historical bars input # and get historical output + history_output: Union[ + dict[str, np.ndarray], # multi-output case + np.ndarray, # single output case + ] history_output = await out_stream.__anext__() func_name = func.__name__ profiler(f'{func_name} generated history') # build struct array with an 'index' field to push as history - history = np.zeros( - len(history_output), - dtype=dst.array.dtype - ) # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. - fields = getattr(history.dtype, 'fields', None) - if fields: + # await tractor.breakpoint() + fields = getattr(dst.array.dtype, 'fields', None).copy() + fields.pop('index') + # TODO: nptyping here! + history: Optional[np.ndarray] = None + if fields and len(fields) > 1 and fields: + if not isinstance(history_output, dict): + raise ValueError( + f'`{func_name}` is a multi-output FSP and should yield a ' + '`dict[str, np.ndarray]` for history' + ) + for key in fields.keys(): - if key in history.dtype.fields: - history[func_name] = history_output + if key in history_output: + output = history_output[key] + + if history is None: + # using the first output, determine + # the length of the struct-array that + # will be pushed to shm. + history = np.zeros( + len(output), + dtype=dst.array.dtype + ) + + if output is None: + continue + + history[key] = output # single-key output stream else: + if not isinstance(history_output, np.ndarray): + raise ValueError( + f'`{func_name}` is a single output FSP and should yield an ' + '`np.ndarray` for history' + ) + history = np.zeros( + len(history_output), + dtype=dst.array.dtype + ) history[func_name] = history_output # TODO: XXX: