diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 7e5eb810..b7a5fb32 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -105,7 +105,10 @@ class NoBsWs: def connected(self) -> bool: return self._connected.is_set() - async def reset(self) -> None: + async def reset( + self, + timeout: float, + ) -> bool: ''' Reset the underlying ws connection by cancelling the bg relay task and waiting for it to signal @@ -114,18 +117,31 @@ class NoBsWs: ''' self._connected = trio.Event() self._cs.cancel() - await self._connected.wait() + with trio.move_on_after(timeout) as cs: + await self._connected.wait() + return True + + assert cs.cancelled_caught + return False async def send_msg( self, data: Any, + timeout: float = 3, ) -> None: while True: try: msg: Any = self._dumps(data) return await self._ws.send_message(msg) except self.recon_errors: - await self.reset() + with trio.CancelScope(shield=True): + reconnected: bool = await self.reset( + timeout=timeout, + ) + if not reconnected: + log.warning( + 'Failed to reconnect after {timeout!r}s ??' + ) async def recv_msg(self) -> Any: msg: Any = await self._rx.receive() @@ -191,7 +207,9 @@ async def _reconnect_forever( f'{src_mod}\n' f'{url} connection bail with:' ) - await trio.sleep(0.5) + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + rent_cs.cancel() # go back to reonnect loop in parent task @@ -291,7 +309,8 @@ async def _reconnect_forever( log.exception( 'Reconnect-attempt failed ??\n' ) - await trio.sleep(0.2) # throttle + with trio.CancelScope(shield=True): + await trio.sleep(0.2) # throttle raise berr #|_ws & nursery block ends