diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index f4e42bc1..9654a2a1 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -199,7 +199,10 @@ def maybe_mk_fsp_shm( # TODO: load output types from `Fsp` # - should `index` be a required internal field? fsp_dtype = np.dtype( - [('index', int)] + + [('index', int)] + + + [('time', float)] + + [(field_name, float) for field_name in target.outputs] ) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index c47455e3..edef7219 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -132,7 +132,10 @@ async def fsp_compute( fields.pop('index') history: Optional[np.ndarray] = None # TODO: nptyping here! - if fields and len(fields) > 1 and fields: + if ( + fields and + len(fields) > 1 + ): if not isinstance(history_output, dict): raise ValueError( f'`{func_name}` is a multi-output FSP and should yield a ' @@ -217,8 +220,14 @@ async def fsp_compute( log.debug(f"{func_name}: {processed}") key, output = processed - index = src.index - dst.array[-1][key] = output + # dst.array[-1][key] = output + dst.array[[key, 'time']][-1] = ( + output, + # TODO: what about pushing ``time.time_ns()`` + # in which case we'll need to round at the graphics + # processing / sampling layer? + src.array[-1]['time'] + ) # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts @@ -229,6 +238,7 @@ async def fsp_compute( # N-consumers who subscribe for the real-time output, # which we'll likely want to implement using local-mem # chans for the fan out? + # index = src.index # if attach_stream: # await client_stream.send(index) @@ -388,7 +398,8 @@ async def cascade( src: ShmArray, dst: ShmArray ) -> tuple[bool, int, int]: - '''Predicate to dertmine if a destination FSP + ''' + Predicate to dertmine if a destination FSP output array is aligned to its source array. ''' @@ -406,11 +417,9 @@ async def cascade( ), step_diff, len_diff async def poll_and_sync_to_step( - tracker: TaskTracker, src: ShmArray, dst: ShmArray, - ) -> tuple[TaskTracker, int]: synced, step_diff, _ = is_synced(src, dst) @@ -469,4 +478,15 @@ async def cascade( else: last = array[-1:].copy() + # sync with source time step + src_t = src.array['time'][-1] + last['time'] = src_t + dst.push(last) + + # dst_t = dst.array['time'][-1] + # print( + # f'{dst.token}\n' + # f'src: {src_t}\n' + # f'dst: {dst_t}\n' + # )