diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index cc32af91..609218ad 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -80,20 +80,20 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to time-step-sample (real-time) quote feeds, see - ``.service.maybe_open_samplerd()`` and the below - ``register_with_sampler()``. + `.service.maybe_open_samplerd()` and the below + `register_with_sampler()`. ''' - service_nursery: None | trio.Nursery = None + service_nursery: None|trio.Nursery = None - # TODO: we could stick these in a composed type to avoid - # angering the "i hate module scoped variables crowd" (yawn). + # TODO: we could stick these in a composed type to avoid angering + # the "i hate module scoped variables crowd" (yawn). ohlcv_shms: dict[float, list[ShmArray]] = {} # holds one-task-per-sample-period tasks which are spawned as-needed by # data feed requests with a given detected time step usually from # history loading. - incr_task_cs: trio.CancelScope | None = None + incr_task_cs: trio.CancelScope|None = None bcast_errors: tuple[Exception] = ( trio.BrokenResourceError, @@ -248,8 +248,8 @@ class Sampler: async def broadcast( self, period_s: float, - time_stamp: float | None = None, - info: dict | None = None, + time_stamp: float|None = None, + info: dict|None = None, ) -> None: ''' @@ -313,7 +313,7 @@ class Sampler: @classmethod async def broadcast_all( self, - info: dict | None = None, + info: dict|None = None, ) -> None: # NOTE: take a copy of subs since removals can happen @@ -330,12 +330,12 @@ class Sampler: async def register_with_sampler( ctx: Context, period_s: float, - shms_by_period: dict[float, dict] | None = None, + shms_by_period: dict[float, dict]|None = None, open_index_stream: bool = True, # open a 2way stream for sample step msgs? sub_for_broadcasts: bool = True, # sampler side to send step updates? -) -> None: +) -> set[int]: get_console_log(tractor.current_actor().loglevel) incr_was_started: bool = False @@ -362,7 +362,12 @@ async def register_with_sampler( # insert the base 1s period (for OHLC style sampling) into # the increment buffer set to update and shift every second. - if shms_by_period is not None: + if ( + shms_by_period is not None + # and + # feed_is_live.is_set() + # ^TODO? pass it in instead? + ): from ._sharedmem import ( attach_shm_array, _Token, @@ -376,12 +381,17 @@ async def register_with_sampler( readonly=False, ) shms_by_period[period] = shm - Sampler.ohlcv_shms.setdefault(period, []).append(shm) + Sampler.ohlcv_shms.setdefault( + period, + [], + ).append(shm) assert Sampler.ohlcv_shms # unblock caller - await ctx.started(set(Sampler.ohlcv_shms.keys())) + await ctx.started( + set(Sampler.ohlcv_shms.keys()) + ) if open_index_stream: try: @@ -427,7 +437,7 @@ async def register_with_sampler( async def spawn_samplerd( - loglevel: str | None = None, + loglevel: str|None = None, **extra_tractor_kwargs ) -> bool: @@ -473,7 +483,7 @@ async def spawn_samplerd( @acm async def maybe_open_samplerd( - loglevel: str | None = None, + loglevel: str|None = None, **pikerd_kwargs, ) -> tractor.Portal: # noqa @@ -498,11 +508,11 @@ async def maybe_open_samplerd( @acm async def open_sample_stream( period_s: float, - shms_by_period: dict[float, dict] | None = None, + shms_by_period: dict[float, dict]|None = None, open_index_stream: bool = True, sub_for_broadcasts: bool = True, - cache_key: str | None = None, + cache_key: str|None = None, allow_new_sampler: bool = True, ensure_is_active: bool = False, @@ -533,6 +543,8 @@ async def open_sample_stream( # yield bistream # else: + ctx: tractor.Context + shm_periods: set[int] # in `int`-seconds async with ( # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be @@ -547,10 +559,10 @@ async def open_sample_stream( 'open_index_stream': open_index_stream, 'sub_for_broadcasts': sub_for_broadcasts, }, - ) as (ctx, first) + ) as (ctx, shm_periods) ): if ensure_is_active: - assert len(first) > 1 + assert len(shm_periods) > 1 async with ( ctx.open_stream(