Establish stream before `fsp_compute` so that backfill updates work again..

m4_corrections
Tyler Goodlet 2022-04-03 23:28:30 -04:00
parent 4378974b59
commit 0e1656978b
1 changed files with 120 additions and 98 deletions

View File

@ -76,7 +76,6 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context,
symbol: Symbol, symbol: Symbol,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -86,7 +85,7 @@ async def fsp_compute(
func: Callable, func: Callable,
attach_stream: bool = False, # attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -193,27 +192,22 @@ async def fsp_compute(
profiler(f'{func_name} pushed history') profiler(f'{func_name} pushed history')
profiler.finish() profiler.finish()
# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs) tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index)) task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
# import time # import time
# last = time.time() # last = time.time()
try: try:
# rt stream
async with ctx.open_stream() as stream:
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await stream.send('update')
async for processed in out_stream: async for processed in out_stream:
@ -225,8 +219,14 @@ async def fsp_compute(
# NOTE: for now we aren't streaming this to the consumer # NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts # stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm # as trigger msg to tell the consumer to read from shm
if attach_stream: # TODO: further this should likely be implemented much
await stream.send(index) # like our `Feed` api where there is one background
# "service" task which computes output and then sends to
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# if attach_stream:
# await client_stream.send(index)
# period = time.time() - last # period = time.time() - last
# hz = 1/period if period else float('nan') # hz = 1/period if period else float('nan')
@ -320,7 +320,6 @@ async def cascade(
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
quote_stream=quote_stream, quote_stream=quote_stream,
@ -329,7 +328,7 @@ async def cascade(
src=src, src=src,
dst=dst, dst=dst,
# func_name=func_name, # target
func=func func=func
) )
@ -341,13 +340,34 @@ async def cascade(
profiler(f'{func_name}: fsp up') profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: # sync client
await ctx.started(index)
# XXX: rt stream with client which we MUST
# open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
# TODO: these likely should all become
# methods of this ``TaskLifetime`` or wtv
# abstraction..
async def resync(
tracker: TaskTracker,
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source') log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel() tracker.cs.cancel()
await tracker.complete.wait() await tracker.complete.wait()
return await n.start(fsp_target) tracker, index = await n.start(fsp_target)
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
return tracker, index
def is_synced( def is_synced(
src: ShmArray, src: ShmArray,
@ -394,7 +414,9 @@ async def cascade(
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream(int(delay_s)) as istream: async with feed.index_stream(
int(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up') profiler(f'{func_name}: sample stream up')
profiler.finish() profiler.finish()