Improve tpt-closed msg-fmt/content and CRE case matching
Refine tpt-error reporting to include closure attribution (`'locally'` vs `'by peer'`), tighten match conditions and reduce needless newlines in exc reprs. Deats, - factor out `trans_err_msg: str` and `by_whom: str` into a `dict` lookup before the `match:` block to pair specific err msgs to closure attribution strings. - use `by_whom` directly as `CRE` case guard condition (truthy when msg matches known underlying CRE msg content). - conveniently include `by_whom!r` in `TransportClosed` message. - fix `'locally ?'` -> `'locally?'` in send-side `CRE` handler (drop errant space). - add masked `maybe_pause_bp()` calls at both `CRE` sites (from when i was tracing a test harness issue where the UDS socket path wasn't being cleaned up on teardown). - drop trailing `\n` from `body=` args to `TransportClosed`. - reuse `trans_err_msg` for the `BRE`/broken-pipe guard. Also adjust testing, namely `test_ctxep_pauses_n_maybe_ipc_breaks`'s expected patts-set for new msg formats to be raised out of `.ipc._transport`. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codetpt_tolerance
parent
5ded99a886
commit
bf6de55865
|
|
@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
|
||||||
['peer IPC channel closed abruptly?',
|
['peer IPC channel closed abruptly?',
|
||||||
'another task closed this fd',
|
'another task closed this fd',
|
||||||
'Debug lock request was CANCELLED?',
|
'Debug lock request was CANCELLED?',
|
||||||
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
|
"'MsgpackUDSStream' was already closed locally?",
|
||||||
|
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
|
||||||
|
# ?TODO^? match depending on `tpt_proto(s)`?
|
||||||
|
]
|
||||||
|
|
||||||
# XXX races on whether these show/hit?
|
# XXX races on whether these show/hit?
|
||||||
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',
|
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',
|
||||||
|
|
|
||||||
|
|
@ -154,7 +154,6 @@ class MsgTransport(Protocol):
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class MsgpackTransport(MsgTransport):
|
class MsgpackTransport(MsgTransport):
|
||||||
|
|
||||||
# TODO: better naming for this?
|
# TODO: better naming for this?
|
||||||
|
|
@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport):
|
||||||
except trio.ClosedResourceError as cre:
|
except trio.ClosedResourceError as cre:
|
||||||
closure_err = cre
|
closure_err = cre
|
||||||
|
|
||||||
|
# await tractor.devx._trace.maybe_pause_bp()
|
||||||
|
|
||||||
raise TransportClosed(
|
raise TransportClosed(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} was already closed locally ?\n'
|
f'{tpt_name} was already closed locally?'
|
||||||
),
|
),
|
||||||
src_exc=closure_err,
|
src_exc=closure_err,
|
||||||
loglevel='error',
|
loglevel='error',
|
||||||
raise_on_report=(
|
raise_on_report=(
|
||||||
'another task closed this fd' in closure_err.args
|
'another task closed this fd'
|
||||||
|
in
|
||||||
|
closure_err.args
|
||||||
),
|
),
|
||||||
) from closure_err
|
) from closure_err
|
||||||
|
|
||||||
|
|
@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport):
|
||||||
trans_err = _re
|
trans_err = _re
|
||||||
tpt_name: str = f'{type(self).__name__!r}'
|
tpt_name: str = f'{type(self).__name__!r}'
|
||||||
|
|
||||||
|
trans_err_msg: str = trans_err.args[0]
|
||||||
|
by_whom: str = {
|
||||||
|
'another task closed this fd': 'locally',
|
||||||
|
'this socket was already closed': 'by peer',
|
||||||
|
}.get(trans_err_msg)
|
||||||
match trans_err:
|
match trans_err:
|
||||||
|
|
||||||
# XXX, specifc to UDS transport and its,
|
# XXX, specifc to UDS transport and its,
|
||||||
|
|
@ -446,13 +454,13 @@ class MsgpackTransport(MsgTransport):
|
||||||
case trio.BrokenResourceError() if (
|
case trio.BrokenResourceError() if (
|
||||||
'[Errno 32] Broken pipe'
|
'[Errno 32] Broken pipe'
|
||||||
in
|
in
|
||||||
trans_err.args[0]
|
trans_err_msg
|
||||||
):
|
):
|
||||||
tpt_closed = TransportClosed.from_src_exc(
|
tpt_closed = TransportClosed.from_src_exc(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} already closed by peer\n'
|
f'{tpt_name} already closed by peer\n'
|
||||||
),
|
),
|
||||||
body=f'{self}\n',
|
body=f'{self}',
|
||||||
src_exc=trans_err,
|
src_exc=trans_err,
|
||||||
raise_on_report=True,
|
raise_on_report=True,
|
||||||
loglevel='transport',
|
loglevel='transport',
|
||||||
|
|
@ -462,19 +470,19 @@ class MsgpackTransport(MsgTransport):
|
||||||
# ??TODO??, what case in piker does this and HOW
|
# ??TODO??, what case in piker does this and HOW
|
||||||
# CAN WE RE-PRODUCE IT?!?!?
|
# CAN WE RE-PRODUCE IT?!?!?
|
||||||
case trio.ClosedResourceError() if (
|
case trio.ClosedResourceError() if (
|
||||||
'this socket was already closed'
|
by_whom
|
||||||
in
|
|
||||||
trans_err.args[0]
|
|
||||||
):
|
):
|
||||||
tpt_closed = TransportClosed.from_src_exc(
|
tpt_closed = TransportClosed.from_src_exc(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} already closed by peer\n'
|
f'{tpt_name} was already closed {by_whom!r}?\n'
|
||||||
),
|
),
|
||||||
body=f'{self}\n',
|
body=f'{self}',
|
||||||
src_exc=trans_err,
|
src_exc=trans_err,
|
||||||
raise_on_report=True,
|
raise_on_report=True,
|
||||||
loglevel='transport',
|
loglevel='transport',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# await tractor.devx._trace.maybe_pause_bp()
|
||||||
raise tpt_closed from trans_err
|
raise tpt_closed from trans_err
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# unless the disconnect condition falls under "a
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue