TOSQUASH 4eb5fe0dd96 (FSP copy time from src -> dst)

Slice up to history's length worth of (latest) time stamps from source
series read at the start of the history init phase.
epoch_index_backup
Tyler Goodlet 2022-12-02 11:30:48 -05:00
parent faecd6f0e0
commit a2f75a83b6
2 changed files with 16 additions and 12 deletions

View File

@ -112,8 +112,9 @@ async def fsp_compute(
flume.rt_shm, flume.rt_shm,
) )
# Conduct a single iteration of fsp with historical bars input # HISTORY COMPUTE PHASE
# and get historical output # conduct a single iteration of fsp with historical bars input
# and get historical output, pack into
history_output: Union[ history_output: Union[
dict[str, np.ndarray], # multi-output case dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case np.ndarray, # single output case
@ -130,7 +131,8 @@ async def fsp_compute(
# each respective field. # each respective field.
fields = getattr(dst.array.dtype, 'fields', None).copy() fields = getattr(dst.array.dtype, 'fields', None).copy()
fields.pop('index') fields.pop('index')
history: Optional[np.ndarray] = None # TODO: nptyping here! history_by_field: Optional[np.ndarray] = None
src_time = src.array['time']
if ( if (
fields and fields and
@ -146,7 +148,7 @@ async def fsp_compute(
if key in history_output: if key in history_output:
output = history_output[key] output = history_output[key]
if history is None: if history_by_field is None:
if output is None: if output is None:
length = len(src.array) length = len(src.array)
@ -156,7 +158,7 @@ async def fsp_compute(
# using the first output, determine # using the first output, determine
# the length of the struct-array that # the length of the struct-array that
# will be pushed to shm. # will be pushed to shm.
history = np.zeros( history_by_field = np.zeros(
length, length,
dtype=dst.array.dtype dtype=dst.array.dtype
) )
@ -164,7 +166,7 @@ async def fsp_compute(
if output is None: if output is None:
continue continue
history[key] = output history_by_field[key] = output
# single-key output stream # single-key output stream
else: else:
@ -173,13 +175,13 @@ async def fsp_compute(
f'`{func_name}` is a single output FSP and should yield an ' f'`{func_name}` is a single output FSP and should yield an '
'`np.ndarray` for history' '`np.ndarray` for history'
) )
history = np.zeros( history_by_field = np.zeros(
len(history_output), len(history_output),
dtype=dst.array.dtype dtype=dst.array.dtype
) )
history[func_name] = history_output history_by_field[func_name] = history_output
history['time'] = src.array['time'] history_by_field['time'] = src_time[-len(history_by_field):]
# TODO: XXX: # TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're # THERE'S A BIG BUG HERE WITH THE `index` field since we're
@ -196,7 +198,10 @@ async def fsp_compute(
# TODO: can we use this `start` flag instead of the manual # TODO: can we use this `start` flag instead of the manual
# setting above? # setting above?
index = dst.push(history, start=first) index = dst.push(
history_by_field,
start=first,
)
profiler(f'{func_name} pushed history') profiler(f'{func_name} pushed history')
profiler.finish() profiler.finish()

View File

@ -268,8 +268,7 @@ async def flow_rates(
'dark_dvlm_rate': None, 'dark_dvlm_rate': None,
} }
# TODO: 3.10 do ``anext()`` quote = await anext(source)
quote = await source.__anext__()
# ltr = 0 # ltr = 0
# lvr = 0 # lvr = 0