Only register shms w sampler when `feed_is_live`

Add timeout-gated wait for `feed_is_live: trio.Event` before passing shm
tokens to `open_sample_stream()`; skip registering shm-buffers with the
sampler if the feed doesn't "go live" within a new timeout.

The main motivation here is to avoid the sampler incrementing shm-array
bufs when the mkt-venue is closed so that a trailing "same price"
line/bars isn't updated/rendered in the chart's view when unnecessary.

Deats,
- add `wait_for_live_timeout: float = 0.5` param to `manage_history()`
- warn-log the fqme when timeout triggers
- add error log for invalid `frame_start_dt` comparisons to
  `maybe_fill_null_segments()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
refresh_annots
Gud Boi 2026-01-30 18:40:48 -05:00
parent d5edd3484f
commit f73b981173
1 changed files with 24 additions and 4 deletions

View File

@ -247,6 +247,11 @@ async def maybe_fill_null_segments(
from_timestamp(array['time'][0])
) < backfill_until_dt
):
log.error(
f'Invalid frame_start !?\n'
f'frame_start_dt: {frame_start_dt!r}\n'
f'backfill_until_dt: {backfill_until_dt!r}\n'
)
await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
@ -1284,6 +1289,7 @@ async def manage_history(
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -1432,12 +1438,26 @@ async def manage_history(
1: rt_shm,
60: hist_shm,
}
async with open_sample_stream(
period_s=1.,
shms_by_period={
shms_by_period: dict|None = None
with trio.move_on_after(wait_for_live_timeout) as cs:
await feed_is_live.wait()
if cs.cancelled_caught:
log.warning(
f'No live feed within {wait_for_live_timeout!r}s\n'
f'fqme: {mkt.fqme!r}\n'
f'NOT activating shm-buffer-sampler!!\n'
)
if feed_is_live.is_set():
shms_by_period: dict[int, dict] = {
1.: rt_shm.token,
60.: hist_shm.token,
},
}
async with open_sample_stream(
period_s=1.,
shms_by_period=shms_by_period,
# NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the