diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 96c01df8..a5939eb7 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -729,6 +729,7 @@ class Router(Struct): except ( trio.ClosedResourceError, trio.BrokenResourceError, + tractor.TransportClosed, ): to_remove.add(client_stream) log.warning( @@ -1699,5 +1700,5 @@ async def _emsd_main( if not client_streams: log.warning( f'Order dialog is not being monitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' + f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n' ) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index cc32af91..802ea391 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -99,6 +99,7 @@ class Sampler: trio.BrokenResourceError, trio.ClosedResourceError, trio.EndOfChannel, + tractor.TransportClosed, ) # holds all the ``tractor.Context`` remote subscriptions for @@ -291,9 +292,10 @@ class Sampler: except self.bcast_errors as err: log.error( - f'Connection dropped for IPC ctx\n' - f'{stream._ctx}\n\n' - f'Due to {type(err)}' + f'Connection dropped for IPC ctx due to,\n' + f'{type(err)!r}\n' + f'\n' + f'{stream._ctx}' ) borked.add(stream) else: @@ -741,7 +743,7 @@ async def sample_and_broadcast( log.warning( f'Feed OVERRUN {sub_key}' f'@{bus.brokername} -> \n' - f'feed @ {chan.uid}\n' + f'feed @ {chan.aid.reprol()}\n' f'throttle = {throttle} Hz' ) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 7e5eb810..fde04e71 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -31,6 +31,7 @@ from typing import ( AsyncContextManager, AsyncGenerator, Iterable, + Type, ) import json @@ -67,7 +68,7 @@ class NoBsWs: ''' # apparently we can QoS for all sorts of reasons..so catch em. - recon_errors = ( + recon_errors: tuple[Type[Exception]] = ( ConnectionClosed, DisconnectionTimeout, ConnectionRejected, @@ -105,7 +106,10 @@ class NoBsWs: def connected(self) -> bool: return self._connected.is_set() - async def reset(self) -> None: + async def reset( + self, + timeout: float, + ) -> bool: ''' Reset the underlying ws connection by cancelling the bg relay task and waiting for it to signal @@ -114,18 +118,31 @@ class NoBsWs: ''' self._connected = trio.Event() self._cs.cancel() - await self._connected.wait() + with trio.move_on_after(timeout) as cs: + await self._connected.wait() + return True + + assert cs.cancelled_caught + return False async def send_msg( self, data: Any, + timeout: float = 3, ) -> None: while True: try: msg: Any = self._dumps(data) return await self._ws.send_message(msg) except self.recon_errors: - await self.reset() + with trio.CancelScope(shield=True): + reconnected: bool = await self.reset( + timeout=timeout, + ) + if not reconnected: + log.warning( + 'Failed to reconnect after {timeout!r}s ??' + ) async def recv_msg(self) -> Any: msg: Any = await self._rx.receive() @@ -191,7 +208,9 @@ async def _reconnect_forever( f'{src_mod}\n' f'{url} connection bail with:' ) - await trio.sleep(0.5) + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + rent_cs.cancel() # go back to reonnect loop in parent task @@ -291,7 +310,8 @@ async def _reconnect_forever( log.exception( 'Reconnect-attempt failed ??\n' ) - await trio.sleep(0.2) # throttle + with trio.CancelScope(shield=True): + await trio.sleep(0.2) # throttle raise berr #|_ws & nursery block ends @@ -351,32 +371,39 @@ async def open_autorecon_ws( rcv: trio.MemoryReceiveChannel snd, rcv = trio.open_memory_channel(616) - async with ( - tractor.trionics.collapse_eg(), - trio.open_nursery() as tn - ): - nobsws = NoBsWs( - url, - rcv, - msg_recv_timeout=msg_recv_timeout, - ) - await tn.start( - partial( - _reconnect_forever, + try: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): + nobsws = NoBsWs( url, - snd, - nobsws, - fixture=fixture, - reset_after=reset_after, + rcv, + msg_recv_timeout=msg_recv_timeout, ) + await tn.start( + partial( + _reconnect_forever, + url, + snd, + nobsws, + fixture=fixture, + reset_after=reset_after, + ) + ) + await nobsws._connected.wait() + assert nobsws._cs + assert nobsws.connected() + try: + yield nobsws + finally: + tn.cancel_scope.cancel() + + except NoBsWs.recon_errors as con_err: + log.warning( + f'Entire ws-channel disconnect due to,\n' + f'con_err: {con_err!r}\n' ) - await nobsws._connected.wait() - assert nobsws._cs - assert nobsws.connected() - try: - yield nobsws - finally: - tn.cancel_scope.cancel() '''