Clarify `register_with_sampler()` started type and vars

Markup `ctx.started()` type-sig as `set[int]`, rename binding var
`first` to `shm_periods` and add type hints for clarity on context mgr
unpacking.

Also,
- whitespace cleanup: `Type | None` -> `Type|None` throughout
- format long lines: `.setdefault()`, `await ctx.started()`
- fix backtick style in docstrings for consistency
- add placeholder TODO comment for `feed_is_live` check; it might be
  more rigorous to pass the syncing state down thru all this?

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
refresh_annots
Gud Boi 2026-01-30 18:35:52 -05:00
parent bac8317a4a
commit d5edd3484f
1 changed files with 32 additions and 20 deletions

View File

@ -80,20 +80,20 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
'''
service_nursery: None | trio.Nursery = None
service_nursery: None|trio.Nursery = None
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
# TODO: we could stick these in a composed type to avoid angering
# the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from
# history loading.
incr_task_cs: trio.CancelScope | None = None
incr_task_cs: trio.CancelScope|None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
@ -248,8 +248,8 @@ class Sampler:
async def broadcast(
self,
period_s: float,
time_stamp: float | None = None,
info: dict | None = None,
time_stamp: float|None = None,
info: dict|None = None,
) -> None:
'''
@ -313,7 +313,7 @@ class Sampler:
@classmethod
async def broadcast_all(
self,
info: dict | None = None,
info: dict|None = None,
) -> None:
# NOTE: take a copy of subs since removals can happen
@ -330,12 +330,12 @@ class Sampler:
async def register_with_sampler(
ctx: Context,
period_s: float,
shms_by_period: dict[float, dict] | None = None,
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> None:
) -> set[int]:
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
@ -362,7 +362,12 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second.
if shms_by_period is not None:
if (
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import (
attach_shm_array,
_Token,
@ -376,12 +381,17 @@ async def register_with_sampler(
readonly=False,
)
shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
Sampler.ohlcv_shms.setdefault(
period,
[],
).append(shm)
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
await ctx.started(
set(Sampler.ohlcv_shms.keys())
)
if open_index_stream:
try:
@ -427,7 +437,7 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str | None = None,
loglevel: str|None = None,
**extra_tractor_kwargs
) -> bool:
@ -473,7 +483,7 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str | None = None,
loglevel: str|None = None,
**pikerd_kwargs,
) -> tractor.Portal: # noqa
@ -498,11 +508,11 @@ async def maybe_open_samplerd(
@acm
async def open_sample_stream(
period_s: float,
shms_by_period: dict[float, dict] | None = None,
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
cache_key: str | None = None,
cache_key: str|None = None,
allow_new_sampler: bool = True,
ensure_is_active: bool = False,
@ -533,6 +543,8 @@ async def open_sample_stream(
# yield bistream
# else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with (
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
@ -547,10 +559,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
},
) as (ctx, first)
) as (ctx, shm_periods)
):
if ensure_is_active:
assert len(first) > 1
assert len(shm_periods) > 1
async with (
ctx.open_stream(