Note the issues with the shared fsp array index..

bar_select
Tyler Goodlet 2020-10-19 14:01:25 -04:00
parent 32974a118c
commit 1706b67e00
1 changed files with 26 additions and 1 deletions

View File

@ -44,6 +44,10 @@ async def increment_signals(
feed: Feed,
dst_shm: 'SharedArray', # noqa
) -> None:
"""Increment the underlying shared memory buffer on every "increment"
msg received from the underlying data feed.
"""
async for msg in await feed.index_stream():
array = dst_shm.array
last = array[-1:].copy()
@ -64,6 +68,7 @@ async def cascade(
) -> AsyncIterator[dict]:
"""Chain streaming signal processors and deliver output to
destination mem buf.
"""
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
@ -87,10 +92,30 @@ async def cascade(
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
#
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for derivatives.
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history = await out_stream.__anext__()
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# TODO: talk to ``pyqtgraph`` core about proper way to solve this:
# XXX: hack to get curves aligned with bars graphics: prepend