From 25be7f8d08e008db803ae5f08e36894b62a68473 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 16 Apr 2022 18:33:26 -0400 Subject: [PATCH] Factor subscription broadcasting into a func --- piker/data/_sampling.py | 43 +++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 4228f809..bdc7b4d0 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -22,7 +22,7 @@ financial data flows. from __future__ import annotations from collections import Counter import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import tractor import trio @@ -90,6 +90,7 @@ async def increment_ohlc_buffer( total_s = 0 # total seconds counted lowest = min(sampler.ohlcv_shms.keys()) + lowest_shm = sampler.ohlcv_shms[lowest][0] ad = lowest - 0.001 with trio.CancelScope() as cs: @@ -133,21 +134,33 @@ async def increment_ohlc_buffer( # write to the buffer shm.push(last) - # broadcast the buffer index step to any subscribers for - # a given sample period. - subs = sampler.subscribers.get(delay_s, ()) + await broadcast(delay_s, shm=lowest_shm) - for stream in subs: - try: - await stream.send({'index': shm._last.value}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error( - f'{stream._ctx.chan.uid} dropped connection' - ) - subs.remove(stream) + +async def broadcast( + delay_s: int, + shm: Optional[ShmArray] = None, + +) -> None: + # broadcast the buffer index step to any subscribers for + # a given sample period. + subs = sampler.subscribers.get(delay_s, ()) + + if shm is None: + lowest = min(sampler.ohlcv_shms.keys()) + shm = sampler.ohlcv_shms[lowest][0] + + for stream in subs: + try: + await stream.send({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + subs.remove(stream) @tractor.context