From bf6de5586523dcea811c3906600435e0cc587108 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 18 Feb 2026 19:36:45 -0500 Subject: [PATCH] Improve tpt-closed msg-fmt/content and CRE case matching Refine tpt-error reporting to include closure attribution (`'locally'` vs `'by peer'`), tighten match conditions and reduce needless newlines in exc reprs. Deats, - factor out `trans_err_msg: str` and `by_whom: str` into a `dict` lookup before the `match:` block to pair specific err msgs to closure attribution strings. - use `by_whom` directly as `CRE` case guard condition (truthy when msg matches known underlying CRE msg content). - conveniently include `by_whom!r` in `TransportClosed` message. - fix `'locally ?'` -> `'locally?'` in send-side `CRE` handler (drop errant space). - add masked `maybe_pause_bp()` calls at both `CRE` sites (from when i was tracing a test harness issue where the UDS socket path wasn't being cleaned up on teardown). - drop trailing `\n` from `body=` args to `TransportClosed`. - reuse `trans_err_msg` for the `BRE`/broken-pipe guard. Also adjust testing, namely `test_ctxep_pauses_n_maybe_ipc_breaks`'s expected patts-set for new msg formats to be raised out of `.ipc._transport`. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/devx/test_debugger.py | 5 ++++- tractor/ipc/_transport.py | 28 ++++++++++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py index cacab803..d3f9fa5d 100644 --- a/tests/devx/test_debugger.py +++ b/tests/devx/test_debugger.py @@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks( ['peer IPC channel closed abruptly?', 'another task closed this fd', 'Debug lock request was CANCELLED?', - "TransportClosed: 'MsgpackUDSStream' was already closed locally ?",] + "'MsgpackUDSStream' was already closed locally?", + "TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?", + # ?TODO^? match depending on `tpt_proto(s)`? + ] # XXX races on whether these show/hit? # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 93652a87..97ba3e5a 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -154,7 +154,6 @@ class MsgTransport(Protocol): # ... - class MsgpackTransport(MsgTransport): # TODO: better naming for this? @@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport): except trio.ClosedResourceError as cre: closure_err = cre + # await tractor.devx._trace.maybe_pause_bp() + raise TransportClosed( message=( - f'{tpt_name} was already closed locally ?\n' + f'{tpt_name} was already closed locally?' ), src_exc=closure_err, loglevel='error', raise_on_report=( - 'another task closed this fd' in closure_err.args + 'another task closed this fd' + in + closure_err.args ), ) from closure_err @@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport): trans_err = _re tpt_name: str = f'{type(self).__name__!r}' + trans_err_msg: str = trans_err.args[0] + by_whom: str = { + 'another task closed this fd': 'locally', + 'this socket was already closed': 'by peer', + }.get(trans_err_msg) match trans_err: # XXX, specifc to UDS transport and its, @@ -446,13 +454,13 @@ class MsgpackTransport(MsgTransport): case trio.BrokenResourceError() if ( '[Errno 32] Broken pipe' in - trans_err.args[0] + trans_err_msg ): tpt_closed = TransportClosed.from_src_exc( message=( f'{tpt_name} already closed by peer\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', @@ -462,19 +470,19 @@ class MsgpackTransport(MsgTransport): # ??TODO??, what case in piker does this and HOW # CAN WE RE-PRODUCE IT?!?!? case trio.ClosedResourceError() if ( - 'this socket was already closed' - in - trans_err.args[0] + by_whom ): tpt_closed = TransportClosed.from_src_exc( message=( - f'{tpt_name} already closed by peer\n' + f'{tpt_name} was already closed {by_whom!r}?\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', ) + + # await tractor.devx._trace.maybe_pause_bp() raise tpt_closed from trans_err # unless the disconnect condition falls under "a