From ef3309adf95bc9a9f2ac78e60e8088ea015d45b8 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 4 Feb 2026 20:09:35 -0500 Subject: [PATCH] Add timeout + shielding to `NoBsWs` reconnect logic Add timeout param to `.reset()` and `.send_msg()` to prevent indefinite blocking on reconnect attempts. Shield reconnect sleeps from cancellation to ensure we avoid any "finally footgun" type scenarios where `trio.Cancelled` masks an underlying exc per, - https://github.com/goodboy/tractor/pull/387 - https://github.com/goodboy/tractor/pull/391 Deats, - add `timeout` param to `.reset()`, return `bool` for success - add `timeout=3` default to `.send_msg()` for reconnect wait - shield `.reset()` call in `.send_msg()` error handler - log warning when reconnect timeout exceeded - shield throttled sleeps in `_reconnect_forever()` error paths (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/data/_web_bs.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 7e5eb810..b7a5fb32 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -105,7 +105,10 @@ class NoBsWs: def connected(self) -> bool: return self._connected.is_set() - async def reset(self) -> None: + async def reset( + self, + timeout: float, + ) -> bool: ''' Reset the underlying ws connection by cancelling the bg relay task and waiting for it to signal @@ -114,18 +117,31 @@ class NoBsWs: ''' self._connected = trio.Event() self._cs.cancel() - await self._connected.wait() + with trio.move_on_after(timeout) as cs: + await self._connected.wait() + return True + + assert cs.cancelled_caught + return False async def send_msg( self, data: Any, + timeout: float = 3, ) -> None: while True: try: msg: Any = self._dumps(data) return await self._ws.send_message(msg) except self.recon_errors: - await self.reset() + with trio.CancelScope(shield=True): + reconnected: bool = await self.reset( + timeout=timeout, + ) + if not reconnected: + log.warning( + 'Failed to reconnect after {timeout!r}s ??' + ) async def recv_msg(self) -> Any: msg: Any = await self._rx.receive() @@ -191,7 +207,9 @@ async def _reconnect_forever( f'{src_mod}\n' f'{url} connection bail with:' ) - await trio.sleep(0.5) + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + rent_cs.cancel() # go back to reonnect loop in parent task @@ -291,7 +309,8 @@ async def _reconnect_forever( log.exception( 'Reconnect-attempt failed ??\n' ) - await trio.sleep(0.2) # throttle + with trio.CancelScope(shield=True): + await trio.sleep(0.2) # throttle raise berr #|_ws & nursery block ends