Detect and request sample period in fsp engine

async_hist_loading
Tyler Goodlet 2022-02-28 07:34:47 -05:00
parent b1cce8f9cf
commit 3e7d4f8717
1 changed files with 8 additions and 4 deletions

View File

@ -123,7 +123,6 @@ async def fsp_compute(
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
# if the output array is multi-field then push # if the output array is multi-field then push
# each respective field. # each respective field.
# await tractor.breakpoint()
fields = getattr(dst.array.dtype, 'fields', None).copy() fields = getattr(dst.array.dtype, 'fields', None).copy()
fields.pop('index') fields.pop('index')
# TODO: nptyping here! # TODO: nptyping here!
@ -269,7 +268,7 @@ async def cascade(
f'Registered FSP set:\n{lines}' f'Registered FSP set:\n{lines}'
) )
# update actor local flows table which registers # update actorlocal flows table which registers
# readonly "instances" of this fsp for symbol/source # readonly "instances" of this fsp for symbol/source
# so that consumer fsps can look it up by source + fsp. # so that consumer fsps can look it up by source + fsp.
# TODO: ugh i hate this wind/unwind to list over the wire # TODO: ugh i hate this wind/unwind to list over the wire
@ -381,14 +380,19 @@ async def cascade(
s, step, ld = is_synced(src, dst) s, step, ld = is_synced(src, dst)
# detect sample period step for subscription to increment
# signal
times = src.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream: async with feed.index_stream(int(delay_s)) as istream:
profiler(f'{func_name}: sample stream up') profiler(f'{func_name}: sample stream up')
profiler.finish() profiler.finish()
async for msg in stream: async for _ in istream:
# respawn the compute task if the source # respawn the compute task if the source
# array has been updated such that we compute # array has been updated such that we compute