From e1f3d7c3f87277d824550c6db6923ecdd6658880 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 4 Feb 2026 20:02:28 -0500 Subject: [PATCH 1/4] Handle `tractor.TransportClosed` as "stream-closed" In both the ems and sampler since on new `tractor` this is the "wrapping" exception raised when the transport layer terminates early but in a psuedo-"graceful" way, expected when a peer actors disconnect. Previously we were crashing in this case since old `tractor` just raised the underlying `trio`-source-exceptions verbatim. Also, - use `Aid.reprol()` in log msgs vs old `.chan.uid` refs (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/clearing/_ems.py | 3 ++- piker/data/_sampling.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 96c01df8..a5939eb7 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -729,6 +729,7 @@ class Router(Struct): except ( trio.ClosedResourceError, trio.BrokenResourceError, + tractor.TransportClosed, ): to_remove.add(client_stream) log.warning( @@ -1699,5 +1700,5 @@ async def _emsd_main( if not client_streams: log.warning( f'Order dialog is not being monitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' + f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n' ) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index cc32af91..ca4c59a9 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -99,6 +99,7 @@ class Sampler: trio.BrokenResourceError, trio.ClosedResourceError, trio.EndOfChannel, + tractor.TransportClosed, ) # holds all the ``tractor.Context`` remote subscriptions for @@ -741,7 +742,7 @@ async def sample_and_broadcast( log.warning( f'Feed OVERRUN {sub_key}' f'@{bus.brokername} -> \n' - f'feed @ {chan.uid}\n' + f'feed @ {chan.aid.reprol()}\n' f'throttle = {throttle} Hz' ) -- 2.34.1 From 6f390dc88cc23ff715d21c0307ac1161bba525bc Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 4 Feb 2026 20:09:35 -0500 Subject: [PATCH 2/4] Add timeout + shielding to `NoBsWs` reconnect logic Add timeout param to `.reset()` and `.send_msg()` to prevent indefinite blocking on reconnect attempts. Shield reconnect sleeps from cancellation to ensure we avoid any "finally footgun" type scenarios where `trio.Cancelled` masks an underlying exc per, - https://github.com/goodboy/tractor/pull/387 - https://github.com/goodboy/tractor/pull/391 Deats, - add `timeout` param to `.reset()`, return `bool` for success - add `timeout=3` default to `.send_msg()` for reconnect wait - shield `.reset()` call in `.send_msg()` error handler - log warning when reconnect timeout exceeded - shield throttled sleeps in `_reconnect_forever()` error paths (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/data/_web_bs.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 7e5eb810..b7a5fb32 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -105,7 +105,10 @@ class NoBsWs: def connected(self) -> bool: return self._connected.is_set() - async def reset(self) -> None: + async def reset( + self, + timeout: float, + ) -> bool: ''' Reset the underlying ws connection by cancelling the bg relay task and waiting for it to signal @@ -114,18 +117,31 @@ class NoBsWs: ''' self._connected = trio.Event() self._cs.cancel() - await self._connected.wait() + with trio.move_on_after(timeout) as cs: + await self._connected.wait() + return True + + assert cs.cancelled_caught + return False async def send_msg( self, data: Any, + timeout: float = 3, ) -> None: while True: try: msg: Any = self._dumps(data) return await self._ws.send_message(msg) except self.recon_errors: - await self.reset() + with trio.CancelScope(shield=True): + reconnected: bool = await self.reset( + timeout=timeout, + ) + if not reconnected: + log.warning( + 'Failed to reconnect after {timeout!r}s ??' + ) async def recv_msg(self) -> Any: msg: Any = await self._rx.receive() @@ -191,7 +207,9 @@ async def _reconnect_forever( f'{src_mod}\n' f'{url} connection bail with:' ) - await trio.sleep(0.5) + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + rent_cs.cancel() # go back to reonnect loop in parent task @@ -291,7 +309,8 @@ async def _reconnect_forever( log.exception( 'Reconnect-attempt failed ??\n' ) - await trio.sleep(0.2) # throttle + with trio.CancelScope(shield=True): + await trio.sleep(0.2) # throttle raise berr #|_ws & nursery block ends -- 2.34.1 From 3d83b61f3f7b982e84a4230dba5777f1407c78f1 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 18 Feb 2026 13:37:47 -0500 Subject: [PATCH 3/4] Wrap `open_autorecon_ws()` body for comms failures Add outer `try/except` around the nursery block in `open_autorecon_ws()` to catch any `NoBsWs.recon_errors` that escape the inner reconnect loop, logging a warning instead of propagating. Also, - correct `NoBsWs.recon_errors` typing to `tuple[Type[Exception]]`. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/data/_web_bs.py | 56 ++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index b7a5fb32..fde04e71 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -31,6 +31,7 @@ from typing import ( AsyncContextManager, AsyncGenerator, Iterable, + Type, ) import json @@ -67,7 +68,7 @@ class NoBsWs: ''' # apparently we can QoS for all sorts of reasons..so catch em. - recon_errors = ( + recon_errors: tuple[Type[Exception]] = ( ConnectionClosed, DisconnectionTimeout, ConnectionRejected, @@ -370,32 +371,39 @@ async def open_autorecon_ws( rcv: trio.MemoryReceiveChannel snd, rcv = trio.open_memory_channel(616) - async with ( - tractor.trionics.collapse_eg(), - trio.open_nursery() as tn - ): - nobsws = NoBsWs( - url, - rcv, - msg_recv_timeout=msg_recv_timeout, - ) - await tn.start( - partial( - _reconnect_forever, + try: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): + nobsws = NoBsWs( url, - snd, - nobsws, - fixture=fixture, - reset_after=reset_after, + rcv, + msg_recv_timeout=msg_recv_timeout, ) + await tn.start( + partial( + _reconnect_forever, + url, + snd, + nobsws, + fixture=fixture, + reset_after=reset_after, + ) + ) + await nobsws._connected.wait() + assert nobsws._cs + assert nobsws.connected() + try: + yield nobsws + finally: + tn.cancel_scope.cancel() + + except NoBsWs.recon_errors as con_err: + log.warning( + f'Entire ws-channel disconnect due to,\n' + f'con_err: {con_err!r}\n' ) - await nobsws._connected.wait() - assert nobsws._cs - assert nobsws.connected() - try: - yield nobsws - finally: - tn.cancel_scope.cancel() ''' -- 2.34.1 From 4e24cb1bff296987ce35b0130fac58564cf71dac Mon Sep 17 00:00:00 2001 From: goodboy Date: Fri, 20 Feb 2026 14:52:36 -0500 Subject: [PATCH 4/4] Adjust sampler's "IPC-dropped" log msg styling Refmt the "connection-dropped" error-log in `Sampler`'s broadcast loop to show error type first, then the IPC context details; mks it all easier to grok/less-noisy on console imo. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/data/_sampling.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index ca4c59a9..802ea391 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -292,9 +292,10 @@ class Sampler: except self.bcast_errors as err: log.error( - f'Connection dropped for IPC ctx\n' - f'{stream._ctx}\n\n' - f'Due to {type(err)}' + f'Connection dropped for IPC ctx due to,\n' + f'{type(err)!r}\n' + f'\n' + f'{stream._ctx}' ) borked.add(stream) else: -- 2.34.1