diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index d16bf529..db3f53b2 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -135,6 +135,7 @@ async def increment_ohlc_buffer( async def iter_ohlc_periods( ctx: tractor.Context, delay_s: int, + ) -> None: """ Subscribe to OHLC sampling "step" events: when the time @@ -270,18 +271,20 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): + ctx = getattr(stream, '_ctx', None) + if ctx: + log.warning( + f'{ctx.chan.uid} dropped ' + '`brokerd`-quotes-feed connection' + ) + if tick_throttle: + assert stream.closed() + # XXX: do we need to deregister here # if it's done in the fee bus code? # so far seems like no since this should all - # be single-threaded. - log.warning( - f'{stream._ctx.chan.uid} dropped ' - '`brokerd`-quotes-feed connection' - ) - if tick_throttle: - assert stream.closed() - # await stream.aclose() - + # be single-threaded. Doing it anyway though + # since there seems to be some kinda race.. subs.remove((stream, tick_throttle))