Establish stream before `fsp_compute` so that backfill updates work again..
parent
cc2873b09f
commit
cda6e2fab4
|
@ -76,7 +76,6 @@ async def filter_quotes_by_sym(
|
||||||
|
|
||||||
async def fsp_compute(
|
async def fsp_compute(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
|
||||||
symbol: Symbol,
|
symbol: Symbol,
|
||||||
feed: Feed,
|
feed: Feed,
|
||||||
quote_stream: trio.abc.ReceiveChannel,
|
quote_stream: trio.abc.ReceiveChannel,
|
||||||
|
@ -86,7 +85,7 @@ async def fsp_compute(
|
||||||
|
|
||||||
func: Callable,
|
func: Callable,
|
||||||
|
|
||||||
attach_stream: bool = False,
|
# attach_stream: bool = False,
|
||||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -193,46 +192,47 @@ async def fsp_compute(
|
||||||
profiler(f'{func_name} pushed history')
|
profiler(f'{func_name} pushed history')
|
||||||
profiler.finish()
|
profiler.finish()
|
||||||
|
|
||||||
# TODO: UGH, what is the right way to do something like this?
|
|
||||||
if not ctx._started_called:
|
|
||||||
await ctx.started(index)
|
|
||||||
|
|
||||||
# setup a respawn handle
|
# setup a respawn handle
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
|
||||||
|
# TODO: might be better to just make a "restart" method where
|
||||||
|
# the target task is spawned implicitly and then the event is
|
||||||
|
# set via some higher level api? At that poing we might as well
|
||||||
|
# be writing a one-cancels-one nursery though right?
|
||||||
tracker = TaskTracker(trio.Event(), cs)
|
tracker = TaskTracker(trio.Event(), cs)
|
||||||
task_status.started((tracker, index))
|
task_status.started((tracker, index))
|
||||||
|
|
||||||
profiler(f'{func_name} yield last index')
|
profiler(f'{func_name} yield last index')
|
||||||
|
|
||||||
# import time
|
# import time
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# rt stream
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
|
|
||||||
# always trigger UI refresh after history update,
|
async for processed in out_stream:
|
||||||
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
|
||||||
# ``piker.ui._display.trigger_update()``.
|
|
||||||
await stream.send('update')
|
|
||||||
|
|
||||||
async for processed in out_stream:
|
log.debug(f"{func_name}: {processed}")
|
||||||
|
key, output = processed
|
||||||
|
index = src.index
|
||||||
|
dst.array[-1][key] = output
|
||||||
|
|
||||||
log.debug(f"{func_name}: {processed}")
|
# NOTE: for now we aren't streaming this to the consumer
|
||||||
key, output = processed
|
# stream latest array index entry which basically just acts
|
||||||
index = src.index
|
# as trigger msg to tell the consumer to read from shm
|
||||||
dst.array[-1][key] = output
|
# TODO: further this should likely be implemented much
|
||||||
|
# like our `Feed` api where there is one background
|
||||||
|
# "service" task which computes output and then sends to
|
||||||
|
# N-consumers who subscribe for the real-time output,
|
||||||
|
# which we'll likely want to implement using local-mem
|
||||||
|
# chans for the fan out?
|
||||||
|
# if attach_stream:
|
||||||
|
# await client_stream.send(index)
|
||||||
|
|
||||||
# NOTE: for now we aren't streaming this to the consumer
|
# period = time.time() - last
|
||||||
# stream latest array index entry which basically just acts
|
# hz = 1/period if period else float('nan')
|
||||||
# as trigger msg to tell the consumer to read from shm
|
# if hz > 60:
|
||||||
if attach_stream:
|
# log.info(f'FSP quote too fast: {hz}')
|
||||||
await stream.send(index)
|
# last = time.time()
|
||||||
|
|
||||||
# period = time.time() - last
|
|
||||||
# hz = 1/period if period else float('nan')
|
|
||||||
# if hz > 60:
|
|
||||||
# log.info(f'FSP quote too fast: {hz}')
|
|
||||||
# last = time.time()
|
|
||||||
finally:
|
finally:
|
||||||
tracker.complete.set()
|
tracker.complete.set()
|
||||||
|
|
||||||
|
@ -323,7 +323,6 @@ async def cascade(
|
||||||
fsp_target = partial(
|
fsp_target = partial(
|
||||||
|
|
||||||
fsp_compute,
|
fsp_compute,
|
||||||
ctx=ctx,
|
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
feed=feed,
|
feed=feed,
|
||||||
quote_stream=quote_stream,
|
quote_stream=quote_stream,
|
||||||
|
@ -332,7 +331,7 @@ async def cascade(
|
||||||
src=src,
|
src=src,
|
||||||
dst=dst,
|
dst=dst,
|
||||||
|
|
||||||
# func_name=func_name,
|
# target
|
||||||
func=func
|
func=func
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -344,90 +343,113 @@ async def cascade(
|
||||||
|
|
||||||
profiler(f'{func_name}: fsp up')
|
profiler(f'{func_name}: fsp up')
|
||||||
|
|
||||||
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
|
# sync client
|
||||||
# TODO: adopt an incremental update engine/approach
|
await ctx.started(index)
|
||||||
# where possible here eventually!
|
|
||||||
log.warning(f're-syncing fsp {func_name} to source')
|
|
||||||
tracker.cs.cancel()
|
|
||||||
await tracker.complete.wait()
|
|
||||||
return await n.start(fsp_target)
|
|
||||||
|
|
||||||
def is_synced(
|
# XXX: rt stream with client which we MUST
|
||||||
src: ShmArray,
|
# open here (and keep it open) in order to make
|
||||||
dst: ShmArray
|
# incremental "updates" as history prepends take
|
||||||
) -> tuple[bool, int, int]:
|
# place.
|
||||||
'''Predicate to dertmine if a destination FSP
|
async with ctx.open_stream() as client_stream:
|
||||||
output array is aligned to its source array.
|
|
||||||
|
|
||||||
'''
|
# TODO: these likely should all become
|
||||||
step_diff = src.index - dst.index
|
# methods of this ``TaskLifetime`` or wtv
|
||||||
len_diff = abs(len(src.array) - len(dst.array))
|
# abstraction..
|
||||||
return not (
|
async def resync(
|
||||||
# the source is likely backfilling and we must
|
tracker: TaskTracker,
|
||||||
# sync history calculations
|
|
||||||
len_diff > 2 or
|
|
||||||
|
|
||||||
# we aren't step synced to the source and may be
|
) -> tuple[TaskTracker, int]:
|
||||||
# leading/lagging by a step
|
# TODO: adopt an incremental update engine/approach
|
||||||
step_diff > 1 or
|
# where possible here eventually!
|
||||||
step_diff < 0
|
log.warning(f're-syncing fsp {func_name} to source')
|
||||||
), step_diff, len_diff
|
tracker.cs.cancel()
|
||||||
|
await tracker.complete.wait()
|
||||||
|
tracker, index = await n.start(fsp_target)
|
||||||
|
|
||||||
async def poll_and_sync_to_step(
|
# always trigger UI refresh after history update,
|
||||||
|
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
||||||
|
# ``piker.ui._display.trigger_update()``.
|
||||||
|
await client_stream.send('update')
|
||||||
|
return tracker, index
|
||||||
|
|
||||||
tracker: TaskTracker,
|
def is_synced(
|
||||||
src: ShmArray,
|
src: ShmArray,
|
||||||
dst: ShmArray,
|
dst: ShmArray
|
||||||
|
) -> tuple[bool, int, int]:
|
||||||
|
'''Predicate to dertmine if a destination FSP
|
||||||
|
output array is aligned to its source array.
|
||||||
|
|
||||||
) -> tuple[TaskTracker, int]:
|
'''
|
||||||
|
step_diff = src.index - dst.index
|
||||||
|
len_diff = abs(len(src.array) - len(dst.array))
|
||||||
|
return not (
|
||||||
|
# the source is likely backfilling and we must
|
||||||
|
# sync history calculations
|
||||||
|
len_diff > 2 or
|
||||||
|
|
||||||
|
# we aren't step synced to the source and may be
|
||||||
|
# leading/lagging by a step
|
||||||
|
step_diff > 1 or
|
||||||
|
step_diff < 0
|
||||||
|
), step_diff, len_diff
|
||||||
|
|
||||||
|
async def poll_and_sync_to_step(
|
||||||
|
|
||||||
|
tracker: TaskTracker,
|
||||||
|
src: ShmArray,
|
||||||
|
dst: ShmArray,
|
||||||
|
|
||||||
|
) -> tuple[TaskTracker, int]:
|
||||||
|
|
||||||
synced, step_diff, _ = is_synced(src, dst)
|
|
||||||
while not synced:
|
|
||||||
tracker, index = await resync(tracker)
|
|
||||||
synced, step_diff, _ = is_synced(src, dst)
|
synced, step_diff, _ = is_synced(src, dst)
|
||||||
|
while not synced:
|
||||||
|
tracker, index = await resync(tracker)
|
||||||
|
synced, step_diff, _ = is_synced(src, dst)
|
||||||
|
|
||||||
return tracker, step_diff
|
return tracker, step_diff
|
||||||
|
|
||||||
s, step, ld = is_synced(src, dst)
|
s, step, ld = is_synced(src, dst)
|
||||||
|
|
||||||
# detect sample period step for subscription to increment
|
# detect sample period step for subscription to increment
|
||||||
# signal
|
# signal
|
||||||
times = src.array['time']
|
times = src.array['time']
|
||||||
delay_s = times[-1] - times[times != times[-1]][-1]
|
delay_s = times[-1] - times[times != times[-1]][-1]
|
||||||
|
|
||||||
# Increment the underlying shared memory buffer on every
|
# Increment the underlying shared memory buffer on every
|
||||||
# "increment" msg received from the underlying data feed.
|
# "increment" msg received from the underlying data feed.
|
||||||
async with feed.index_stream(int(delay_s)) as istream:
|
async with feed.index_stream(
|
||||||
|
int(delay_s)
|
||||||
|
) as istream:
|
||||||
|
|
||||||
profiler(f'{func_name}: sample stream up')
|
profiler(f'{func_name}: sample stream up')
|
||||||
profiler.finish()
|
profiler.finish()
|
||||||
|
|
||||||
async for _ in istream:
|
async for _ in istream:
|
||||||
|
|
||||||
# respawn the compute task if the source
|
# respawn the compute task if the source
|
||||||
# array has been updated such that we compute
|
# array has been updated such that we compute
|
||||||
# new history from the (prepended) source.
|
# new history from the (prepended) source.
|
||||||
synced, step_diff, _ = is_synced(src, dst)
|
synced, step_diff, _ = is_synced(src, dst)
|
||||||
if not synced:
|
if not synced:
|
||||||
tracker, step_diff = await poll_and_sync_to_step(
|
tracker, step_diff = await poll_and_sync_to_step(
|
||||||
tracker,
|
tracker,
|
||||||
src,
|
src,
|
||||||
dst,
|
dst,
|
||||||
)
|
)
|
||||||
|
|
||||||
# skip adding a last bar since we should already
|
# skip adding a last bar since we should already
|
||||||
# be step alinged
|
# be step alinged
|
||||||
if step_diff == 0:
|
if step_diff == 0:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# read out last shm row, copy and write new row
|
# read out last shm row, copy and write new row
|
||||||
array = dst.array
|
array = dst.array
|
||||||
|
|
||||||
# some metrics like vlm should be reset
|
# some metrics like vlm should be reset
|
||||||
# to zero every step.
|
# to zero every step.
|
||||||
if zero_on_step:
|
if zero_on_step:
|
||||||
last = zeroed
|
last = zeroed
|
||||||
else:
|
else:
|
||||||
last = array[-1:].copy()
|
last = array[-1:].copy()
|
||||||
|
|
||||||
dst.push(last)
|
dst.push(last)
|
||||||
|
|
Loading…
Reference in New Issue