diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 0b73b071..66c606de 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -44,6 +44,10 @@ async def increment_signals( feed: Feed, dst_shm: 'SharedArray', # noqa ) -> None: + """Increment the underlying shared memory buffer on every "increment" + msg received from the underlying data feed. + + """ async for msg in await feed.index_stream(): array = dst_shm.array last = array[-1:].copy() @@ -64,6 +68,7 @@ async def cascade( ) -> AsyncIterator[dict]: """Chain streaming signal processors and deliver output to destination mem buf. + """ src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) @@ -87,10 +92,30 @@ async def cascade( feed.shm, ) + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for derivatives. + # Conduct a single iteration of fsp with historical bars input # and get historical output - history = await out_stream.__anext__() + history_output = await out_stream.__anext__() + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output # TODO: talk to ``pyqtgraph`` core about proper way to solve this: # XXX: hack to get curves aligned with bars graphics: prepend