Copy timestamps from source to FSP dest buffer

Slice up to history's length worth of (latest) time stamps from source
series read at the start of the history init phase.
samplerd_service
Tyler Goodlet 2022-11-26 15:08:36 -05:00
parent a5bb33b0ff
commit d1b07c625f
2 changed files with 17 additions and 11 deletions

View File

@ -112,8 +112,9 @@ async def fsp_compute(
flume.rt_shm,
)
# Conduct a single iteration of fsp with historical bars input
# and get historical output
# HISTORY COMPUTE PHASE
# conduct a single iteration of fsp with historical bars input
# and get historical output, pack into
history_output: Union[
dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case
@ -130,7 +131,8 @@ async def fsp_compute(
# each respective field.
fields = getattr(dst.array.dtype, 'fields', None).copy()
fields.pop('index')
history: Optional[np.ndarray] = None # TODO: nptyping here!
history_by_field: Optional[np.ndarray] = None
src_time = src.array['time']
if (
fields and
@ -146,7 +148,7 @@ async def fsp_compute(
if key in history_output:
output = history_output[key]
if history is None:
if history_by_field is None:
if output is None:
length = len(src.array)
@ -156,7 +158,7 @@ async def fsp_compute(
# using the first output, determine
# the length of the struct-array that
# will be pushed to shm.
history = np.zeros(
history_by_field = np.zeros(
length,
dtype=dst.array.dtype
)
@ -164,7 +166,7 @@ async def fsp_compute(
if output is None:
continue
history[key] = output
history_by_field[key] = output
# single-key output stream
else:
@ -173,11 +175,13 @@ async def fsp_compute(
f'`{func_name}` is a single output FSP and should yield an '
'`np.ndarray` for history'
)
history = np.zeros(
history_by_field = np.zeros(
len(history_output),
dtype=dst.array.dtype
)
history[func_name] = history_output
history_by_field[func_name] = history_output
history_by_field['time'] = src_time[-len(history_by_field):]
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
@ -194,7 +198,10 @@ async def fsp_compute(
# TODO: can we use this `start` flag instead of the manual
# setting above?
index = dst.push(history, start=first)
index = dst.push(
history_by_field,
start=first,
)
profiler(f'{func_name} pushed history')
profiler.finish()

View File

@ -268,8 +268,7 @@ async def flow_rates(
'dark_dvlm_rate': None,
}
# TODO: 3.10 do ``anext()``
quote = await source.__anext__()
quote = await anext(source)
# ltr = 0
# lvr = 0