From c05fc8991aca4348a49509d1d907fd73592142e4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Apr 2021 01:18:51 -0400 Subject: [PATCH] Rework ohlc sampling to launch from .start() Avoid bothering with a trio event and expect the caller to do manual shm registering with the write loop. Provide OHLC sample period indexing through a re-branded pub-sub func ``iter_ohlc_periods()``. --- piker/data/_buffer.py | 122 +++++++++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 896b503b..eccf4ca6 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -17,17 +17,25 @@ """ Data buffers for fast shared humpy. """ -from typing import Tuple, Callable, Dict -# import time +from typing import Dict, List import tractor import trio +from trio_typing import TaskStatus from ._sharedmem import ShmArray +from ..log import get_logger -_shms: Dict[int, ShmArray] = {} +log = get_logger(__name__) + + +# TODO: we could stick these in a composed type to avoid +# angering the "i hate module scoped variables crowd" (yawn). +_shms: Dict[int, List[ShmArray]] = {} _start_increment: Dict[str, trio.Event] = {} +_incrementers: Dict[int, trio.CancelScope] = {} +_subscribers: Dict[str, tractor.Context] = {} def shm_incrementing(shm_token_name: str) -> trio.Event: @@ -35,11 +43,9 @@ def shm_incrementing(shm_token_name: str) -> trio.Event: return _start_increment.setdefault(shm_token_name, trio.Event()) -@tractor.msg.pub async def increment_ohlc_buffer( - shm_token: dict, - get_topics: Callable[..., Tuple[str]], - # delay_s: Optional[float] = None, + delay_s: int, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ): """Task which inserts new bars into the provide shared memory array every ``delay_s`` seconds. @@ -54,14 +60,16 @@ async def increment_ohlc_buffer( the underlying buffers will actually be incremented. """ - # wait for brokerd to signal we should start sampling - await shm_incrementing(shm_token['shm_name']).wait() + # # wait for brokerd to signal we should start sampling + # await shm_incrementing(shm_token['shm_name']).wait() # TODO: right now we'll spin printing bars if the last time stamp is # before a large period of no market activity. Likely the best way # to solve this is to make this task aware of the instrument's # tradable hours? + global _incrementers + # adjust delay to compensate for trio processing time ad = min(_shms.keys()) - 0.001 @@ -69,47 +77,75 @@ async def increment_ohlc_buffer( lowest = min(_shms.keys()) ad = lowest - 0.001 - while True: - # TODO: do we want to support dynamically - # adding a "lower" lowest increment period? - await trio.sleep(ad) - total_s += lowest + with trio.CancelScope() as cs: - # increment all subscribed shm arrays - # TODO: this in ``numba`` - for delay_s, shms in _shms.items(): - if total_s % delay_s != 0: - continue + # register this time period step as active + _incrementers[delay_s] = cs + task_status.started(cs) - # TODO: numa this! - for shm in shms: - # TODO: in theory we could make this faster by copying the - # "last" readable value into the underlying larger buffer's - # next value and then incrementing the counter instead of - # using ``.push()``? + while True: + # TODO: do we want to support dynamically + # adding a "lower" lowest increment period? + await trio.sleep(ad) + total_s += lowest - # append new entry to buffer thus "incrementing" the bar - array = shm.array - last = array[-1:][shm._write_fields].copy() - # (index, t, close) = last[0][['index', 'time', 'close']] - (t, close) = last[0][['time', 'close']] + # increment all subscribed shm arrays + # TODO: this in ``numba`` + for delay_s, shms in _shms.items(): + if total_s % delay_s != 0: + continue - # this copies non-std fields (eg. vwap) from the last datum - last[ - ['time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (t + delay_s, 0, close, close, close, close) + # TODO: numa this! + for shm in shms: + # TODO: in theory we could make this faster by copying the + # "last" readable value into the underlying larger buffer's + # next value and then incrementing the counter instead of + # using ``.push()``? - # write to the buffer - shm.push(last) + # append new entry to buffer thus "incrementing" the bar + array = shm.array + last = array[-1:][shm._write_fields].copy() + # (index, t, close) = last[0][['index', 'time', 'close']] + (t, close) = last[0][['time', 'close']] - # broadcast the buffer index step - yield {'index': shm._last.value} + # this copies non-std fields (eg. vwap) from the last datum + last[ + ['time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (t + delay_s, 0, close, close, close, close) + + # write to the buffer + shm.push(last) + + # broadcast the buffer index step + # yield {'index': shm._last.value} + for ctx in _subscribers.get(delay_s, ()): + try: + await ctx.send_yield({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') -def subscribe_ohlc_for_increment( - shm: ShmArray, - delay: int, +@tractor.stream +async def iter_ohlc_periods( + ctx: tractor.Context, + delay_s: int, ) -> None: - """Add an OHLC ``ShmArray`` to the increment set. """ - _shms.setdefault(delay, []).append(shm) + Subscribe to OHLC sampling "step" events: when the time + aggregation period increments, this event stream emits an index + event. + + """ + # add our subscription + global _subscribers + subs = _subscribers.setdefault(delay_s, []) + subs.append(ctx) + + try: + # stream and block until cancelled + await trio.sleep_forever() + finally: + subs.remove(ctx)