Add epoch time index to fsp buffers

samplerd_service
Tyler Goodlet 2022-11-23 15:06:42 -05:00
parent 1ee49df31d
commit 7da5c2b238
2 changed files with 30 additions and 7 deletions

View File

@ -199,7 +199,10 @@ def maybe_mk_fsp_shm(
# TODO: load output types from `Fsp` # TODO: load output types from `Fsp`
# - should `index` be a required internal field? # - should `index` be a required internal field?
fsp_dtype = np.dtype( fsp_dtype = np.dtype(
[('index', int)] + [('index', int)]
+
[('time', float)]
+
[(field_name, float) for field_name in target.outputs] [(field_name, float) for field_name in target.outputs]
) )

View File

@ -132,7 +132,10 @@ async def fsp_compute(
fields.pop('index') fields.pop('index')
history: Optional[np.ndarray] = None # TODO: nptyping here! history: Optional[np.ndarray] = None # TODO: nptyping here!
if fields and len(fields) > 1 and fields: if (
fields and
len(fields) > 1
):
if not isinstance(history_output, dict): if not isinstance(history_output, dict):
raise ValueError( raise ValueError(
f'`{func_name}` is a multi-output FSP and should yield a ' f'`{func_name}` is a multi-output FSP and should yield a '
@ -217,8 +220,14 @@ async def fsp_compute(
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
key, output = processed key, output = processed
index = src.index # dst.array[-1][key] = output
dst.array[-1][key] = output dst.array[[key, 'time']][-1] = (
output,
# TODO: what about pushing ``time.time_ns()``
# in which case we'll need to round at the graphics
# processing / sampling layer?
src.array[-1]['time']
)
# NOTE: for now we aren't streaming this to the consumer # NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts # stream latest array index entry which basically just acts
@ -229,6 +238,7 @@ async def fsp_compute(
# N-consumers who subscribe for the real-time output, # N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem # which we'll likely want to implement using local-mem
# chans for the fan out? # chans for the fan out?
# index = src.index
# if attach_stream: # if attach_stream:
# await client_stream.send(index) # await client_stream.send(index)
@ -388,7 +398,8 @@ async def cascade(
src: ShmArray, src: ShmArray,
dst: ShmArray dst: ShmArray
) -> tuple[bool, int, int]: ) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP '''
Predicate to dertmine if a destination FSP
output array is aligned to its source array. output array is aligned to its source array.
''' '''
@ -406,11 +417,9 @@ async def cascade(
), step_diff, len_diff ), step_diff, len_diff
async def poll_and_sync_to_step( async def poll_and_sync_to_step(
tracker: TaskTracker, tracker: TaskTracker,
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray,
) -> tuple[TaskTracker, int]: ) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
@ -469,4 +478,15 @@ async def cascade(
else: else:
last = array[-1:].copy() last = array[-1:].copy()
# sync with source time step
src_t = src.array['time'][-1]
last['time'] = src_t
dst.push(last) dst.push(last)
# dst_t = dst.array['time'][-1]
# print(
# f'{dst.token}\n'
# f'src: {src_t}\n'
# f'dst: {dst_t}\n'
# )