diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py index cacab803..d3f9fa5d 100644 --- a/tests/devx/test_debugger.py +++ b/tests/devx/test_debugger.py @@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks( ['peer IPC channel closed abruptly?', 'another task closed this fd', 'Debug lock request was CANCELLED?', - "TransportClosed: 'MsgpackUDSStream' was already closed locally ?",] + "'MsgpackUDSStream' was already closed locally?", + "TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?", + # ?TODO^? match depending on `tpt_proto(s)`? + ] # XXX races on whether these show/hit? # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 93652a87..97ba3e5a 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -154,7 +154,6 @@ class MsgTransport(Protocol): # ... - class MsgpackTransport(MsgTransport): # TODO: better naming for this? @@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport): except trio.ClosedResourceError as cre: closure_err = cre + # await tractor.devx._trace.maybe_pause_bp() + raise TransportClosed( message=( - f'{tpt_name} was already closed locally ?\n' + f'{tpt_name} was already closed locally?' ), src_exc=closure_err, loglevel='error', raise_on_report=( - 'another task closed this fd' in closure_err.args + 'another task closed this fd' + in + closure_err.args ), ) from closure_err @@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport): trans_err = _re tpt_name: str = f'{type(self).__name__!r}' + trans_err_msg: str = trans_err.args[0] + by_whom: str = { + 'another task closed this fd': 'locally', + 'this socket was already closed': 'by peer', + }.get(trans_err_msg) match trans_err: # XXX, specifc to UDS transport and its, @@ -446,13 +454,13 @@ class MsgpackTransport(MsgTransport): case trio.BrokenResourceError() if ( '[Errno 32] Broken pipe' in - trans_err.args[0] + trans_err_msg ): tpt_closed = TransportClosed.from_src_exc( message=( f'{tpt_name} already closed by peer\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', @@ -462,19 +470,19 @@ class MsgpackTransport(MsgTransport): # ??TODO??, what case in piker does this and HOW # CAN WE RE-PRODUCE IT?!?!? case trio.ClosedResourceError() if ( - 'this socket was already closed' - in - trans_err.args[0] + by_whom ): tpt_closed = TransportClosed.from_src_exc( message=( - f'{tpt_name} already closed by peer\n' + f'{tpt_name} was already closed {by_whom!r}?\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', ) + + # await tractor.devx._trace.maybe_pause_bp() raise tpt_closed from trans_err # unless the disconnect condition falls under "a