From 7a943f0e1e61b3fd4a917f8fd149ca120da120af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 06:56:25 -0500 Subject: [PATCH] Always transmit index event even when no shm is registered --- piker/data/_sampling.py | 52 ++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 89228c96..dc247630 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -20,7 +20,6 @@ financial data flows. """ import time -from typing import Dict, List import tractor import trio @@ -35,15 +34,17 @@ log = get_logger(__name__) # TODO: we could stick these in a composed type to avoid # angering the "i hate module scoped variables crowd" (yawn). -_shms: Dict[int, List[ShmArray]] = {} -_start_increment: Dict[str, trio.Event] = {} -_incrementers: Dict[int, trio.CancelScope] = {} -_subscribers: Dict[str, tractor.Context] = {} +_ohlcv_shms: dict[int, list[ShmArray]] = {} +# holds one-task-per-sample-period tasks which are spawned as-needed by +# data feed requests with a given detected time step usually from +# history loading. +_incrementers: dict[int, trio.CancelScope] = {} -def shm_incrementing(shm_token_name: str) -> trio.Event: - global _start_increment - return _start_increment.setdefault(shm_token_name, trio.Event()) +# holds all the ``tractor.Context`` remote subscriptions for +# a particular sample period increment event: all subscribers are +# notified on a step. +_subscribers: dict[int, tractor.Context] = {} async def increment_ohlc_buffer( @@ -72,13 +73,13 @@ async def increment_ohlc_buffer( # to solve this is to make this task aware of the instrument's # tradable hours? - global _incrementers + global _incrementers, _ohlcv_shms, _subscribers # adjust delay to compensate for trio processing time - ad = min(_shms.keys()) - 0.001 + ad = min(_ohlcv_shms.keys()) - 0.001 total_s = 0 # total seconds counted - lowest = min(_shms.keys()) + lowest = min(_ohlcv_shms.keys()) ad = lowest - 0.001 with trio.CancelScope() as cs: @@ -94,8 +95,10 @@ async def increment_ohlc_buffer( total_s += lowest # increment all subscribed shm arrays - # TODO: this in ``numba`` - for delay_s, shms in _shms.items(): + # TODO: + # - this in ``numba`` + # - just lookup shms for this step instead of iterating? + for delay_s, shms in _ohlcv_shms.items(): if total_s % delay_s != 0: continue @@ -120,18 +123,19 @@ async def increment_ohlc_buffer( # write to the buffer shm.push(last) - # broadcast the buffer index step - subs = _subscribers.get(delay_s, ()) + # broadcast the buffer index step to any subscribers for + # a given sample period. + subs = _subscribers.get(delay_s, ()) - for ctx in subs: - try: - await ctx.send_yield({'index': shm._last.value}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error(f'{ctx.chan.uid} dropped connection') - subs.remove(ctx) + for ctx in subs: + try: + await ctx.send_yield({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') + subs.remove(ctx) @tractor.stream