diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7bb0231d..7e6fef54 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -394,7 +394,8 @@ async def register_with_sampler( finally: if ( sub_for_broadcasts - and subs + and + subs ): try: subs.remove(stream) @@ -796,7 +797,6 @@ async def uniform_rate_send( clear_types = _tick_groups['clears'] while True: - # compute the remaining time to sleep for this throttled cycle left_to_sleep = throttle_period - diff @@ -891,11 +891,23 @@ async def uniform_rate_send( # ``pikerd`` these kinds of errors! trio.ClosedResourceError, trio.BrokenResourceError, + trio.EndOfChannel, ConnectionResetError, - ): - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') + ) as ipc_err: + match ipc_err: + case trio.EndOfChannel(): + log.info( + f'{stream} terminated by peer,\n' + f'{ipc_err!r}' + ) + case _: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning( + f'{stream} closed due to,\n' + f'{ipc_err!r}' + ) + await stream.aclose() return