From 54712827eeacab404f9e8a779c482ad9b9e850d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Dec 2021 14:16:13 -0500 Subject: [PATCH] Fix context attr lookup.. --- piker/data/_sampling.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index d16bf529..db3f53b2 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -135,6 +135,7 @@ async def increment_ohlc_buffer( async def iter_ohlc_periods( ctx: tractor.Context, delay_s: int, + ) -> None: """ Subscribe to OHLC sampling "step" events: when the time @@ -270,18 +271,20 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): + ctx = getattr(stream, '_ctx', None) + if ctx: + log.warning( + f'{ctx.chan.uid} dropped ' + '`brokerd`-quotes-feed connection' + ) + if tick_throttle: + assert stream.closed() + # XXX: do we need to deregister here # if it's done in the fee bus code? # so far seems like no since this should all - # be single-threaded. - log.warning( - f'{stream._ctx.chan.uid} dropped ' - '`brokerd`-quotes-feed connection' - ) - if tick_throttle: - assert stream.closed() - # await stream.aclose() - + # be single-threaded. Doing it anyway though + # since there seems to be some kinda race.. subs.remove((stream, tick_throttle))