diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a1d615f5..793417e9 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -248,12 +248,15 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. log.error(f'{stream._ctx.chan.uid} dropped connection') + subs.remove((stream, tick_throttle)) async def uniform_rate_send( + rate: float, quote_stream: trio.abc.ReceiveChannel, stream: tractor.MsgStream, + ) -> None: sleep_period = 1/rate - 0.000616 @@ -289,8 +292,14 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul - await stream.send({first_quote['symbol']: first_quote}) - break + try: + await stream.send({first_quote['symbol']: first_quote}) + break + except trio.ClosedResourceError: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning(f'{stream} closed') + return end = time.time() diff = end - start