More ems resiliency: discard broken client dialogs
							parent
							
								
									129ec9fc19
								
							
						
					
					
						commit
						34635c21a9
					
				| 
						 | 
					@ -262,7 +262,15 @@ async def clear_dark_triggers(
 | 
				
			||||||
                            f'pred for {oid} was already removed!?'
 | 
					                            f'pred for {oid} was already removed!?'
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    try:
 | 
				
			||||||
                        await ems_client_order_stream.send(msg)
 | 
					                        await ems_client_order_stream.send(msg)
 | 
				
			||||||
 | 
					                    except (
 | 
				
			||||||
 | 
					                        trio.ClosedResourceError,
 | 
				
			||||||
 | 
					                    ):
 | 
				
			||||||
 | 
					                        log.warning(
 | 
				
			||||||
 | 
					                            f'client {ems_client_order_stream} stream is broke'
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					                        break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                else:  # condition scan loop complete
 | 
					                else:  # condition scan loop complete
 | 
				
			||||||
                    log.debug(f'execs are {execs}')
 | 
					                    log.debug(f'execs are {execs}')
 | 
				
			||||||
| 
						 | 
					@ -572,8 +580,16 @@ async def translate_and_relay_brokerd_events(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # fan-out-relay position msgs immediately by
 | 
					            # fan-out-relay position msgs immediately by
 | 
				
			||||||
            # broadcasting updates on all client streams
 | 
					            # broadcasting updates on all client streams
 | 
				
			||||||
            for client_stream in router.clients:
 | 
					            for client_stream in router.clients.copy():
 | 
				
			||||||
 | 
					                try:
 | 
				
			||||||
                    await client_stream.send(pos_msg)
 | 
					                    await client_stream.send(pos_msg)
 | 
				
			||||||
 | 
					                except(
 | 
				
			||||||
 | 
					                    trio.ClosedResourceError,
 | 
				
			||||||
 | 
					                    trio.BrokenResourceError,
 | 
				
			||||||
 | 
					                ):
 | 
				
			||||||
 | 
					                    router.clients.remove(client_stream)
 | 
				
			||||||
 | 
					                    log.warning(
 | 
				
			||||||
 | 
					                        f'client for {client_stream} was already closed?')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue