Establish stream before `fsp_compute` so that backfill updates work again..

only_draw_iv_for_ohlc
Tyler Goodlet 2022-04-03 23:28:30 -04:00
parent 3a3baca9bc
commit 612813e937
1 changed files with 120 additions and 98 deletions

View File

@ -76,7 +76,6 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context,
symbol: Symbol, symbol: Symbol,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -86,7 +85,7 @@ async def fsp_compute(
func: Callable, func: Callable,
attach_stream: bool = False, # attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -193,46 +192,47 @@ async def fsp_compute(
profiler(f'{func_name} pushed history') profiler(f'{func_name} pushed history')
profiler.finish() profiler.finish()
# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs) tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index)) task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
# import time # import time
# last = time.time() # last = time.time()
try: try:
# rt stream
async with ctx.open_stream() as stream:
# always trigger UI refresh after history update, async for processed in out_stream:
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await stream.send('update')
async for processed in out_stream: log.debug(f"{func_name}: {processed}")
key, output = processed
index = src.index
dst.array[-1][key] = output
log.debug(f"{func_name}: {processed}") # NOTE: for now we aren't streaming this to the consumer
key, output = processed # stream latest array index entry which basically just acts
index = src.index # as trigger msg to tell the consumer to read from shm
dst.array[-1][key] = output # TODO: further this should likely be implemented much
# like our `Feed` api where there is one background
# "service" task which computes output and then sends to
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# if attach_stream:
# await client_stream.send(index)
# NOTE: for now we aren't streaming this to the consumer # period = time.time() - last
# stream latest array index entry which basically just acts # hz = 1/period if period else float('nan')
# as trigger msg to tell the consumer to read from shm # if hz > 60:
if attach_stream: # log.info(f'FSP quote too fast: {hz}')
await stream.send(index) # last = time.time()
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
finally: finally:
tracker.complete.set() tracker.complete.set()
@ -323,7 +323,6 @@ async def cascade(
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
quote_stream=quote_stream, quote_stream=quote_stream,
@ -332,7 +331,7 @@ async def cascade(
src=src, src=src,
dst=dst, dst=dst,
# func_name=func_name, # target
func=func func=func
) )
@ -344,90 +343,113 @@ async def cascade(
profiler(f'{func_name}: fsp up') profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: # sync client
# TODO: adopt an incremental update engine/approach await ctx.started(index)
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
def is_synced( # XXX: rt stream with client which we MUST
src: ShmArray, # open here (and keep it open) in order to make
dst: ShmArray # incremental "updates" as history prepends take
) -> tuple[bool, int, int]: # place.
'''Predicate to dertmine if a destination FSP async with ctx.open_stream() as client_stream:
output array is aligned to its source array.
''' # TODO: these likely should all become
step_diff = src.index - dst.index # methods of this ``TaskLifetime`` or wtv
len_diff = abs(len(src.array) - len(dst.array)) # abstraction..
return not ( async def resync(
# the source is likely backfilling and we must tracker: TaskTracker,
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be ) -> tuple[TaskTracker, int]:
# leading/lagging by a step # TODO: adopt an incremental update engine/approach
step_diff > 1 or # where possible here eventually!
step_diff < 0 log.warning(f're-syncing fsp {func_name} to source')
), step_diff, len_diff tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
async def poll_and_sync_to_step( # always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
return tracker, index
tracker: TaskTracker, def is_synced(
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
) -> tuple[TaskTracker, int]: '''
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff return tracker, step_diff
s, step, ld = is_synced(src, dst) s, step, ld = is_synced(src, dst)
# detect sample period step for subscription to increment # detect sample period step for subscription to increment
# signal # signal
times = src.array['time'] times = src.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream(int(delay_s)) as istream: async with feed.index_stream(
int(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up') profiler(f'{func_name}: sample stream up')
profiler.finish() profiler.finish()
async for _ in istream: async for _ in istream:
# respawn the compute task if the source # respawn the compute task if the source
# array has been updated such that we compute # array has been updated such that we compute
# new history from the (prepended) source. # new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
if not synced: if not synced:
tracker, step_diff = await poll_and_sync_to_step( tracker, step_diff = await poll_and_sync_to_step(
tracker, tracker,
src, src,
dst, dst,
) )
# skip adding a last bar since we should already # skip adding a last bar since we should already
# be step alinged # be step alinged
if step_diff == 0: if step_diff == 0:
continue continue
# read out last shm row, copy and write new row # read out last shm row, copy and write new row
array = dst.array array = dst.array
# some metrics like vlm should be reset # some metrics like vlm should be reset
# to zero every step. # to zero every step.
if zero_on_step: if zero_on_step:
last = zeroed last = zeroed
else: else:
last = array[-1:].copy() last = array[-1:].copy()
dst.push(last) dst.push(last)