More ems resiliency: discard broken client dialogs
parent
c8d39fbc21
commit
411cc3a85e
|
@ -261,7 +261,15 @@ async def clear_dark_triggers(
|
|||
f'pred for {oid} was already removed!?'
|
||||
)
|
||||
|
||||
await ems_client_order_stream.send(msg)
|
||||
try:
|
||||
await ems_client_order_stream.send(msg)
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
):
|
||||
log.warning(
|
||||
f'client {ems_client_order_stream} stream is broke'
|
||||
)
|
||||
break
|
||||
|
||||
else: # condition scan loop complete
|
||||
log.debug(f'execs are {execs}')
|
||||
|
@ -573,8 +581,16 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
# fan-out-relay position msgs immediately by
|
||||
# broadcasting updates on all client streams
|
||||
for client_stream in router.clients:
|
||||
await client_stream.send(pos_msg)
|
||||
for client_stream in router.clients.copy():
|
||||
try:
|
||||
await client_stream.send(pos_msg)
|
||||
except(
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
router.clients.remove(client_stream)
|
||||
log.warning(
|
||||
f'client for {client_stream} was already closed?')
|
||||
|
||||
continue
|
||||
|
||||
|
|
Loading…
Reference in New Issue