Compare commits

...

2 Commits

Author SHA1 Message Date
Gud Boi f73b981173 Only register shms w sampler when `feed_is_live`
Add timeout-gated wait for `feed_is_live: trio.Event` before passing shm
tokens to `open_sample_stream()`; skip registering shm-buffers with the
sampler if the feed doesn't "go live" within a new timeout.

The main motivation here is to avoid the sampler incrementing shm-array
bufs when the mkt-venue is closed so that a trailing "same price"
line/bars isn't updated/rendered in the chart's view when unnecessary.

Deats,
- add `wait_for_live_timeout: float = 0.5` param to `manage_history()`
- warn-log the fqme when timeout triggers
- add error log for invalid `frame_start_dt` comparisons to
  `maybe_fill_null_segments()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 18:40:48 -05:00
Gud Boi d5edd3484f Clarify `register_with_sampler()` started type and vars
Markup `ctx.started()` type-sig as `set[int]`, rename binding var
`first` to `shm_periods` and add type hints for clarity on context mgr
unpacking.

Also,
- whitespace cleanup: `Type | None` -> `Type|None` throughout
- format long lines: `.setdefault()`, `await ctx.started()`
- fix backtick style in docstrings for consistency
- add placeholder TODO comment for `feed_is_live` check; it might be
  more rigorous to pass the syncing state down thru all this?

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 18:38:45 -05:00
2 changed files with 56 additions and 24 deletions

View File

@ -80,20 +80,20 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
'''
service_nursery: None | trio.Nursery = None
service_nursery: None|trio.Nursery = None
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
# TODO: we could stick these in a composed type to avoid angering
# the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from
# history loading.
incr_task_cs: trio.CancelScope | None = None
incr_task_cs: trio.CancelScope|None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
@ -248,8 +248,8 @@ class Sampler:
async def broadcast(
self,
period_s: float,
time_stamp: float | None = None,
info: dict | None = None,
time_stamp: float|None = None,
info: dict|None = None,
) -> None:
'''
@ -313,7 +313,7 @@ class Sampler:
@classmethod
async def broadcast_all(
self,
info: dict | None = None,
info: dict|None = None,
) -> None:
# NOTE: take a copy of subs since removals can happen
@ -330,12 +330,12 @@ class Sampler:
async def register_with_sampler(
ctx: Context,
period_s: float,
shms_by_period: dict[float, dict] | None = None,
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> None:
) -> set[int]:
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
@ -362,7 +362,12 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second.
if shms_by_period is not None:
if (
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import (
attach_shm_array,
_Token,
@ -376,12 +381,17 @@ async def register_with_sampler(
readonly=False,
)
shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
Sampler.ohlcv_shms.setdefault(
period,
[],
).append(shm)
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
await ctx.started(
set(Sampler.ohlcv_shms.keys())
)
if open_index_stream:
try:
@ -427,7 +437,7 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str | None = None,
loglevel: str|None = None,
**extra_tractor_kwargs
) -> bool:
@ -473,7 +483,7 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str | None = None,
loglevel: str|None = None,
**pikerd_kwargs,
) -> tractor.Portal: # noqa
@ -498,11 +508,11 @@ async def maybe_open_samplerd(
@acm
async def open_sample_stream(
period_s: float,
shms_by_period: dict[float, dict] | None = None,
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
cache_key: str | None = None,
cache_key: str|None = None,
allow_new_sampler: bool = True,
ensure_is_active: bool = False,
@ -533,6 +543,8 @@ async def open_sample_stream(
# yield bistream
# else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with (
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
@ -547,10 +559,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
},
) as (ctx, first)
) as (ctx, shm_periods)
):
if ensure_is_active:
assert len(first) > 1
assert len(shm_periods) > 1
async with (
ctx.open_stream(

View File

@ -247,6 +247,11 @@ async def maybe_fill_null_segments(
from_timestamp(array['time'][0])
) < backfill_until_dt
):
log.error(
f'Invalid frame_start !?\n'
f'frame_start_dt: {frame_start_dt!r}\n'
f'backfill_until_dt: {backfill_until_dt!r}\n'
)
await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
@ -1284,6 +1289,7 @@ async def manage_history(
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -1432,12 +1438,26 @@ async def manage_history(
1: rt_shm,
60: hist_shm,
}
async with open_sample_stream(
period_s=1.,
shms_by_period={
shms_by_period: dict|None = None
with trio.move_on_after(wait_for_live_timeout) as cs:
await feed_is_live.wait()
if cs.cancelled_caught:
log.warning(
f'No live feed within {wait_for_live_timeout!r}s\n'
f'fqme: {mkt.fqme!r}\n'
f'NOT activating shm-buffer-sampler!!\n'
)
if feed_is_live.is_set():
shms_by_period: dict[int, dict] = {
1.: rt_shm.token,
60.: hist_shm.token,
},
}
async with open_sample_stream(
period_s=1.,
shms_by_period=shms_by_period,
# NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the