diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index c49ff4bf..a5d04f0c 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -261,7 +261,15 @@ async def clear_dark_triggers( f'pred for {oid} was already removed!?' ) - await ems_client_order_stream.send(msg) + try: + await ems_client_order_stream.send(msg) + except ( + trio.ClosedResourceError, + ): + log.warning( + f'client {ems_client_order_stream} stream is broke' + ) + break else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -573,8 +581,16 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - for client_stream in router.clients: - await client_stream.send(pos_msg) + for client_stream in router.clients.copy(): + try: + await client_stream.send(pos_msg) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + router.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') continue