From f78e842fba874f20652552f61ef273be726ab99e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Aug 2025 12:58:31 -0400 Subject: [PATCH 01/18] More `TransportClosed`-handling around IPC-IO For IPC-disconnects-during-teardown edge cases, augment some `._rpc` machinery, - in `._invoke()` around the `await chan.send(return_msg)` where we suppress if the underlying `Channel` already disconnected. - add a disjoint handler in `_errors_relayed_via_ipc()` which just reports-n-reraises the exc (same as prior behaviour). * originally i thought it needed to be handled specially (to avoid being crash handled) but turns out that isn't necessary? * hence the also-added-bu-masked-out `debug_filter` / guard expression around the `await debug._maybe_enter_pm()` line. - show the `._invoke()` frame for the moment. --- tractor/_rpc.py | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 68ce56ea..c33c6e12 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc( try: yield # run RPC invoke body + except TransportClosed: + log.exception('Tpt disconnect during remote-exc relay?') + raise + # box and ship RPC errors for wire-transit via # the task's requesting parent IPC-channel. except ( @@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc( and debug_kbis ) ) + # TODO? better then `debug_filter` below? + # and + # not isinstance(err, TransportClosed) ): # XXX QUESTION XXX: is there any case where we'll # want to debug IPC disconnects as a default? @@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc( # recovery logic - the only case is some kind of # strange bug in our transport layer itself? Going # to keep this open ended for now. - log.debug( - 'RPC task crashed, attempting to enter debugger\n' - f'|_{ctx}' - ) + + if _state.debug_mode(): + log.exception( + f'RPC task crashed!\n' + f'Attempting to enter debugger\n' + f'\n' + f'{ctx}' + ) + entered_debug = await debug._maybe_enter_pm( err, api_frame=inspect.currentframe(), + + # don't REPL any psuedo-expected tpt-disconnect + # debug_filter=lambda exc: ( + # type (exc) not in { + # TransportClosed, + # } + # ), ) if not entered_debug: # if we prolly should have entered the REPL but @@ -450,7 +469,7 @@ async def _invoke( kwargs: dict[str, Any], is_rpc: bool = True, - hide_tb: bool = True, + hide_tb: bool = False, return_msg_type: Return|CancelAck = Return, task_status: TaskStatus[ @@ -674,7 +693,20 @@ async def _invoke( f'\n' f'{pretty_struct.pformat(return_msg)}\n' ) - await chan.send(return_msg) + try: + await chan.send(return_msg) + except TransportClosed: + log.exception( + f"Failed send final result to 'parent'-side of IPC-ctx!\n" + f'\n' + f'{chan}\n' + f'Channel already disconnected ??\n' + f'\n' + f'{pretty_struct.pformat(return_msg)}' + ) + # ?TODO? will this ever be true though? + if chan.connected(): + raise # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, From c3f455a8ecf735c6205b40551a6ae54bb8b94856 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Aug 2025 14:05:46 -0400 Subject: [PATCH 02/18] Mask tpt-closed handling of `chan.send(return_msg)` A partial revert of commit c05d08e42610 since it seem we already suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given that i'm pretty sure this new handler code should basically never run? Left in a todo to remove the masked content once i'm done more thoroughly testing under `piker`. --- tractor/_rpc.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index c33c6e12..801ab672 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -324,8 +324,8 @@ async def _errors_relayed_via_ipc( ) ) # TODO? better then `debug_filter` below? - # and - # not isinstance(err, TransportClosed) + and + not isinstance(err, TransportClosed) ): # XXX QUESTION XXX: is there any case where we'll # want to debug IPC disconnects as a default? @@ -693,20 +693,23 @@ async def _invoke( f'\n' f'{pretty_struct.pformat(return_msg)}\n' ) - try: - await chan.send(return_msg) - except TransportClosed: - log.exception( - f"Failed send final result to 'parent'-side of IPC-ctx!\n" - f'\n' - f'{chan}\n' - f'Channel already disconnected ??\n' - f'\n' - f'{pretty_struct.pformat(return_msg)}' - ) - # ?TODO? will this ever be true though? - if chan.connected(): - raise + await chan.send(return_msg) + # ?TODO, remove the below since .send() already + # doesn't raise on tpt-closed? + # try: + # await chan.send(return_msg) + # except TransportClosed: + # log.exception( + # f"Failed send final result to 'parent'-side of IPC-ctx!\n" + # f'\n' + # f'{chan}\n' + # f'Channel already disconnected ??\n' + # f'\n' + # f'{pretty_struct.pformat(return_msg)}' + # ) + # # ?TODO? will this ever be true though? + # if chan.connected(): + # raise # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, From f8e25688c70f482c9a549b4231c83bb515b91f8f Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 11 Feb 2026 13:49:52 -0500 Subject: [PATCH 03/18] Unmask `ClosedResourceError` handling in `._transport` Unmask the CRE case block for peer-closed socket errors which already had a TODO about reproducing the condition. It appears this case can happen during inter-actor comms teardowns in `piker`, but i haven't been able to figure out exactly what reproduces it yet.. So activate the block again for that 'socket already closed'-msg case, and add a TODO questioning how to reproduce it. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/ipc/_transport.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 704c13a6..93652a87 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -459,21 +459,23 @@ class MsgpackTransport(MsgTransport): ) raise tpt_closed from trans_err - # case trio.ClosedResourceError() if ( - # 'this socket was already closed' - # in - # trans_err.args[0] - # ): - # tpt_closed = TransportClosed.from_src_exc( - # message=( - # f'{tpt_name} already closed by peer\n' - # ), - # body=f'{self}\n', - # src_exc=trans_err, - # raise_on_report=True, - # loglevel='transport', - # ) - # raise tpt_closed from trans_err + # ??TODO??, what case in piker does this and HOW + # CAN WE RE-PRODUCE IT?!?!? + case trio.ClosedResourceError() if ( + 'this socket was already closed' + in + trans_err.args[0] + ): + tpt_closed = TransportClosed.from_src_exc( + message=( + f'{tpt_name} already closed by peer\n' + ), + body=f'{self}\n', + src_exc=trans_err, + raise_on_report=True, + loglevel='transport', + ) + raise tpt_closed from trans_err # unless the disconnect condition falls under "a # normal operation breakage" we usualy console warn From 7145fa364f67fceefa51a65f230187772355f72c Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 18 Feb 2026 16:32:40 -0500 Subject: [PATCH 04/18] Add `SIGINT` cleanup to `spawn` fixture in `devx/conftest` Convert `spawn` fixture to a generator and add post-test graceful subproc cleanup via `SIGINT`/`SIGKILL` to avoid leaving stale `pexpect` child procs around between test runs as well as any UDS-tpt socket files under the system runtime-dir. Deats, - convert `return _spawn` -> `yield _spawn` to enable post-yield teardown logic. - add a new `nonlocal spawned` ref so teardown logic can access the last spawned child from outside the delivered spawner fn-closure. - add `SIGINT`-loop after yield with 5s timeout, then `SIGKILL` if proc still alive. - add masked `breakpoint()` and TODO about UDS path cleanup (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/devx/conftest.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/tests/devx/conftest.py b/tests/devx/conftest.py index 11ec9ed2..a516eecd 100644 --- a/tests/devx/conftest.py +++ b/tests/devx/conftest.py @@ -4,6 +4,7 @@ ''' from __future__ import annotations import time +import signal from typing import ( Callable, TYPE_CHECKING, @@ -69,12 +70,15 @@ def spawn( import os os.environ['PYTHON_COLORS'] = '0' + spawned: PexpectSpawner|None = None + def _spawn( cmd: str, **mkcmd_kwargs, ) -> pty_spawn.spawn: + nonlocal spawned unset_colors() - return testdir.spawn( + spawned = testdir.spawn( cmd=mk_cmd( cmd, **mkcmd_kwargs, @@ -84,9 +88,32 @@ def spawn( # ^TODO? get `pytest` core to expose underlying # `pexpect.spawn()` stuff? ) + return spawned # such that test-dep can pass input script name. - return _spawn # the `PexpectSpawner`, type alias. + yield _spawn # the `PexpectSpawner`, type alias. + + if ( + spawned + and + (ptyproc := spawned.ptyproc) + ): + start: float = time.time() + timeout: float = 5 + while ( + spawned + and + spawned.isalive() + and + (_time_took := (time.time() - start) < timeout) + ): + ptyproc.kill(signal.SIGINT) + time.sleep(0.01) + else: + ptyproc.kill(signal.SIGKILL) + + # TODO? ensure we've cleaned up any UDS-paths? + # breakpoint() @pytest.fixture( From 5ded99a886681272d7298b75f898cc141a27c9c0 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 18 Feb 2026 19:27:49 -0500 Subject: [PATCH 05/18] Add a `._trace.maybe_pause_bp()` for tpt-broken cases Internal helper which falls back to sync `pdb` when the child actor can't reach root to acquire the TTY lock. Useful when debugging tpt layer failures (intentional or otherwise) where a sub-actor can no longer IPC-contact the root to coordinate REPL access; root uses `.pause()` as normal while non-root falls back to `mk_pdb().set_trace()`. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/devx/debug/_trace.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tractor/devx/debug/_trace.py b/tractor/devx/debug/_trace.py index c1219c30..84608671 100644 --- a/tractor/devx/debug/_trace.py +++ b/tractor/devx/debug/_trace.py @@ -1257,3 +1257,26 @@ async def breakpoint( api_frame=inspect.currentframe(), **kwargs, ) + + +async def maybe_pause_bp(): + ''' + Internal (ONLY for now) `breakpoint()`-er fn which only tries to + use the multi-actor `.pause()` API when the current actor is the + root. + + ?! BUT WHY !? + ------- + + This is useful when debugging cases where the tpt layer breaks + (or is intentionally broken, say during resiliency testing) in + the case where a child can no longer contact the root process to + acquire the process-tree-singleton TTY lock. + + ''' + import tractor + actor = tractor.current_actor() + if actor.aid.name == 'root': + await tractor.pause(shield=True) + else: + tractor.devx.mk_pdb().set_trace() From bf6de5586523dcea811c3906600435e0cc587108 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 18 Feb 2026 19:36:45 -0500 Subject: [PATCH 06/18] Improve tpt-closed msg-fmt/content and CRE case matching Refine tpt-error reporting to include closure attribution (`'locally'` vs `'by peer'`), tighten match conditions and reduce needless newlines in exc reprs. Deats, - factor out `trans_err_msg: str` and `by_whom: str` into a `dict` lookup before the `match:` block to pair specific err msgs to closure attribution strings. - use `by_whom` directly as `CRE` case guard condition (truthy when msg matches known underlying CRE msg content). - conveniently include `by_whom!r` in `TransportClosed` message. - fix `'locally ?'` -> `'locally?'` in send-side `CRE` handler (drop errant space). - add masked `maybe_pause_bp()` calls at both `CRE` sites (from when i was tracing a test harness issue where the UDS socket path wasn't being cleaned up on teardown). - drop trailing `\n` from `body=` args to `TransportClosed`. - reuse `trans_err_msg` for the `BRE`/broken-pipe guard. Also adjust testing, namely `test_ctxep_pauses_n_maybe_ipc_breaks`'s expected patts-set for new msg formats to be raised out of `.ipc._transport`. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/devx/test_debugger.py | 5 ++++- tractor/ipc/_transport.py | 28 ++++++++++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py index cacab803..d3f9fa5d 100644 --- a/tests/devx/test_debugger.py +++ b/tests/devx/test_debugger.py @@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks( ['peer IPC channel closed abruptly?', 'another task closed this fd', 'Debug lock request was CANCELLED?', - "TransportClosed: 'MsgpackUDSStream' was already closed locally ?",] + "'MsgpackUDSStream' was already closed locally?", + "TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?", + # ?TODO^? match depending on `tpt_proto(s)`? + ] # XXX races on whether these show/hit? # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 93652a87..97ba3e5a 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -154,7 +154,6 @@ class MsgTransport(Protocol): # ... - class MsgpackTransport(MsgTransport): # TODO: better naming for this? @@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport): except trio.ClosedResourceError as cre: closure_err = cre + # await tractor.devx._trace.maybe_pause_bp() + raise TransportClosed( message=( - f'{tpt_name} was already closed locally ?\n' + f'{tpt_name} was already closed locally?' ), src_exc=closure_err, loglevel='error', raise_on_report=( - 'another task closed this fd' in closure_err.args + 'another task closed this fd' + in + closure_err.args ), ) from closure_err @@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport): trans_err = _re tpt_name: str = f'{type(self).__name__!r}' + trans_err_msg: str = trans_err.args[0] + by_whom: str = { + 'another task closed this fd': 'locally', + 'this socket was already closed': 'by peer', + }.get(trans_err_msg) match trans_err: # XXX, specifc to UDS transport and its, @@ -446,13 +454,13 @@ class MsgpackTransport(MsgTransport): case trio.BrokenResourceError() if ( '[Errno 32] Broken pipe' in - trans_err.args[0] + trans_err_msg ): tpt_closed = TransportClosed.from_src_exc( message=( f'{tpt_name} already closed by peer\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', @@ -462,19 +470,19 @@ class MsgpackTransport(MsgTransport): # ??TODO??, what case in piker does this and HOW # CAN WE RE-PRODUCE IT?!?!? case trio.ClosedResourceError() if ( - 'this socket was already closed' - in - trans_err.args[0] + by_whom ): tpt_closed = TransportClosed.from_src_exc( message=( - f'{tpt_name} already closed by peer\n' + f'{tpt_name} was already closed {by_whom!r}?\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', ) + + # await tractor.devx._trace.maybe_pause_bp() raise tpt_closed from trans_err # unless the disconnect condition falls under "a From 50f40f427b7f3d0047c808385074d45cf5b2346e Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 12:02:35 -0500 Subject: [PATCH 07/18] Include `TransportClosed` in tpt-layer err handling Add `TransportClosed` to except clauses where `trio`'s own resource-closed errors are already caught, ensuring our higher-level tpt exc is also tolerated in those same spots. Likely i will follow up with a removal of the `trio` variants since most *should be* caught and re-raised as tpt-closed out of the `.ipc` stack now? Add `TransportClosed` to various handler blocks, - `._streaming.MsgStream.aclose()/.send()` except blocks. - the broken-channel except in `._context.open_context_from_portal()`. - obvi import it where necessary in those ^ mods. Adjust `test_advanced_faults` suite + exs-script to match, - update `ipc_failure_during_stream.py` example to catch `TransportClosed` alongside `trio.ClosedResourceError` in both the break and send-check paths. - shield the `trio.sleep(0.01)` after tpt close in example to avoid taskc-raise/masking on that checkpoint since we want to simulate waiting for a user to send a KBI. - loosen `ExceptionGroup` assertion to `len(excs) <= 2` and ensure all excs are `TransportClosed`. - improve multi-line formatting, minor style/formatting fixes in condition expressions. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- .../ipc_failure_during_stream.py | 10 ++++++++-- tests/test_advanced_faults.py | 18 +++++++++++++----- tractor/_context.py | 2 ++ tractor/_streaming.py | 5 ++++- 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index c88e0dfe..4242a543 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -17,6 +17,7 @@ from tractor import ( MsgStream, _testing, trionics, + TransportClosed, ) import trio import pytest @@ -208,12 +209,16 @@ async def main( # TODO: is this needed or no? raise - except trio.ClosedResourceError: + except ( + trio.ClosedResourceError, + TransportClosed, + ) as _tpt_err: # NOTE: don't send if we already broke the # connection to avoid raising a closed-error # such that we drop through to the ctl-c # mashing by user. - await trio.sleep(0.01) + with trio.CancelScope(shield=True): + await trio.sleep(0.01) # timeout: int = 1 # with trio.move_on_after(timeout) as cs: @@ -247,6 +252,7 @@ async def main( await stream.send(i) pytest.fail('stream not closed?') except ( + TransportClosed, trio.ClosedResourceError, trio.EndOfChannel, ) as send_err: diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 061ae5aa..cfd9cd8b 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream( expect_final_exc = TransportClosed mod: ModuleType = import_path( - examples_dir() / 'advanced_faults' + examples_dir() + / 'advanced_faults' / 'ipc_failure_during_stream.py', root=examples_dir(), consider_namespace_packages=False, @@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream( if ( # only expect EoC if trans is broken on the child side, ipc_break['break_child_ipc_after'] is not False + and # AND we tell the child to call `MsgStream.aclose()`. - and pre_aclose_msgstream + pre_aclose_msgstream ): # expect_final_exc = trio.EndOfChannel # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this @@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream( ipc_break['break_child_ipc_after'] is not False and ( ipc_break['break_parent_ipc_after'] - > ipc_break['break_child_ipc_after'] + > + ipc_break['break_child_ipc_after'] ) ): if pre_aclose_msgstream: @@ -248,8 +251,13 @@ def test_ipc_channel_break_during_stream( # get raw instance from pytest wrapper value = excinfo.value if isinstance(value, ExceptionGroup): - excs = value.exceptions - assert len(excs) == 1 + excs: tuple[Exception] = value.exceptions + assert ( + len(excs) <= 2 + and + (isinstance(exc, TransportClosed) + for exc in excs) + ) final_exc = excs[0] assert isinstance(final_exc, expect_final_exc) diff --git a/tractor/_context.py b/tractor/_context.py index 8ace1b4d..c5b4afc5 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -70,6 +70,7 @@ from ._exceptions import ( MsgTypeError, RemoteActorError, StreamOverrun, + TransportClosed, pack_from_raise, unpack_error, ) @@ -2431,6 +2432,7 @@ async def open_context_from_portal( except ( trio.BrokenResourceError, trio.ClosedResourceError, + TransportClosed, ): log.warning( 'IPC connection for context is broken?\n' diff --git a/tractor/_streaming.py b/tractor/_streaming.py index fb870f1c..cf3eaab0 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -38,6 +38,7 @@ import trio from ._exceptions import ( ContextCancelled, RemoteActorError, + TransportClosed, ) from .log import get_logger from .trionics import ( @@ -412,7 +413,8 @@ class MsgStream(trio.abc.Channel): except ( trio.BrokenResourceError, - trio.ClosedResourceError + trio.ClosedResourceError, + TransportClosed, ) as re: # the underlying channel may already have been pulled # in which case our stop message is meaningless since @@ -596,6 +598,7 @@ class MsgStream(trio.abc.Channel): trio.ClosedResourceError, trio.BrokenResourceError, BrokenPipeError, + TransportClosed, ) as _trans_err: trans_err = _trans_err if ( From 07c2ba5c0d4c7e3e83e83d36238e2385a99a6652 Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 13:10:02 -0500 Subject: [PATCH 08/18] Drop `trio`-exc-catching if tpt-closed covers them Remove the `trio.ClosedResourceError` and `trio.BrokenResourceError` handling that should now be subsumed by `TransportClosed` re-raising out of the `.ipc` stack. Deats, - drop CRE and BRE from `._streaming.MsgStream.aclose()/.send()` blocks. - similarly rm from `._context.open_context_from_portal()`. - also from `._portal.Portal.cancel_actor()` and drop the (now-completed-todo) comment about this exact thing. Also add comment in `._rpc.try_ship_error_to_remote()` noting the remaining `trio` catches there are bc the `.ipc` layers *should* be wrapping them; thus `log.critical()` use is warranted. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/_context.py | 6 +----- tractor/_portal.py | 13 +------------ tractor/_rpc.py | 5 +++++ tractor/_streaming.py | 5 ----- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index c5b4afc5..5a69077e 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -2429,11 +2429,7 @@ async def open_context_from_portal( try: # await pause(shield=True) await ctx.cancel() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - TransportClosed, - ): + except TransportClosed: log.warning( 'IPC connection for context is broken?\n' f'task: {ctx.cid}\n' diff --git a/tractor/_portal.py b/tractor/_portal.py index a914bb80..2fc9dbb7 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -329,18 +329,7 @@ class Portal: # if we get here some weird cancellation case happened return False - except ( - # XXX, should never really get raised unless we aren't - # wrapping them in the below type by mistake? - # - # Leaving the catch here for now until we're very sure - # all the cases (for various tpt protos) have indeed been - # re-wrapped ;p - trio.ClosedResourceError, - trio.BrokenResourceError, - - TransportClosed, - ) as tpt_err: + except TransportClosed as tpt_err: ipc_borked_report: str = ( f'IPC for actor already closed/broken?\n\n' f'\n' diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 801ab672..bd26b5ed 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -935,6 +935,11 @@ async def try_ship_error_to_remote( # XXX NOTE XXX in SC terms this is one of the worst things # that can happen and provides for a 2-general's dilemma.. + # + # FURHTER, we should never really have to handle these + # lowlevel excs from `trio` since the `Channel.send()` layers + # downward should be mostly wrapping such cases in a + # tpt-closed; the `.critical()` usage is warranted. except ( trio.ClosedResourceError, trio.BrokenResourceError, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index cf3eaab0..56e0607a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -410,10 +410,7 @@ class MsgStream(trio.abc.Channel): # it). with trio.CancelScope(shield=True): await self._ctx.send_stop() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, TransportClosed, ) as re: # the underlying channel may already have been pulled @@ -595,8 +592,6 @@ class MsgStream(trio.abc.Channel): ), ) except ( - trio.ClosedResourceError, - trio.BrokenResourceError, BrokenPipeError, TransportClosed, ) as _trans_err: From 28819bf5d3b9d6dff7655d7b5210131a012cbc1c Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 13:38:47 -0500 Subject: [PATCH 09/18] Add `Actor.is_root()` convenience predicate meth --- tractor/_runtime.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c6ff9e4c..f77c69c1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -183,6 +183,14 @@ class Actor: def is_registrar(self) -> bool: return self.is_arbiter + @property + def is_root(self) -> bool: + ''' + This actor is the parent most in the tree? + + ''' + return _state.is_root_process() + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()`, From 052fe2435f313b84e469d5d1c1601cd0f0b7d002 Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 13:44:05 -0500 Subject: [PATCH 10/18] Improve `Channel` doc-strs + minor cleanups Flesh out missing method doc-strings, improve log msg formatting and assert -> `RuntimeError` for un-inited tpt layer. Deats, - add doc-string to `.send()` noting `TransportClosed` raise on comms failures. - add doc-string to `.recv()`. - expand `._aiter_msgs()` doc-string, line-len reflow. - add doc-string to `.connected()`. - convert `assert self._transport` -> `RuntimeError` raise in `._aiter_msgs()` for more explicit crashing. - expand `_connect_chan()` doc-string, note it's lowlevel and suggest `.open_portal()` to user instead. - factor out `src_exc_str` in `TransportClosed` log handler to avoid double-call - use multiline style for `.connected()` return expr. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/ipc/_chan.py | 50 ++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 64973fd4..9d109f3f 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -307,7 +307,12 @@ class Channel: ) -> None: ''' - Send a coded msg-blob over the transport. + Send a coded msg-blob over the underlying IPC transport. + + This fn raises `TransportClosed` on comms failures and is + normally handled by higher level runtime machinery for the + expected-graceful cases, normally ephemercal + (re/dis)connects. ''' __tracebackhide__: bool = hide_tb @@ -334,9 +339,10 @@ class Channel: except KeyError: raise err case TransportClosed(): + src_exc_str: str = err.repr_src_exc() log.transport( - f'Transport stream closed due to\n' - f'{err.repr_src_exc()}\n' + f'Transport stream closed due to,\n' + f'{src_exc_str}' ) case _: @@ -345,6 +351,11 @@ class Channel: raise async def recv(self) -> Any: + ''' + Receive the latest (queued) msg-blob from the underlying IPC + transport. + + ''' assert self._transport return await self._transport.recv() @@ -418,16 +429,18 @@ class Channel: self ) -> AsyncGenerator[Any, None]: ''' - Yield `MsgType` IPC msgs decoded and deliverd from - an underlying `MsgTransport` protocol. + Yield `MsgType` IPC msgs decoded and deliverd from an + underlying `MsgTransport` protocol. - This is a streaming routine alo implemented as an async-gen - func (same a `MsgTransport._iter_pkts()`) gets allocated by - a `.__call__()` inside `.__init__()` where it is assigned to - the `._aiter_msgs` attr. + This is a streaming routine alo implemented as an + async-generator func (same a `MsgTransport._iter_pkts()`) + gets allocated by a `.__call__()` inside `.__init__()` where + it is assigned to the `._aiter_msgs` attr. ''' - assert self._transport + if not self._transport: + raise RuntimeError('No IPC transport initialized!?') + while True: try: async for msg in self._transport: @@ -462,7 +475,15 @@ class Channel: # continue def connected(self) -> bool: - return self._transport.connected() if self._transport else False + ''' + Predicate whether underlying IPC tpt is connected. + + ''' + return ( + self._transport.connected() + if self._transport + else False + ) async def _do_handshake( self, @@ -493,8 +514,11 @@ async def _connect_chan( addr: UnwrappedAddress ) -> typing.AsyncGenerator[Channel, None]: ''' - Create and connect a channel with disconnect on context manager - teardown. + Create and connect a `Channel` to the provided `addr`, disconnect + it on cm exit. + + NOTE, this is a lowlevel, normally internal-only iface. You + should likely use `.open_portal()` instead. ''' chan = await Channel.from_addr(addr) From 0cddc67bdb60d252479baf7273d150539562546e Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 13:54:28 -0500 Subject: [PATCH 11/18] Add doc-strs to `get_root()` + `maybe_open_portal()` Brief descriptions for both fns in `._discovery` clarifying what each delivers and under what conditions. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/_discovery.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index edc43d86..bf4d066a 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -91,10 +91,13 @@ async def get_registry( @acm -async def get_root( - **kwargs, -) -> AsyncGenerator[Portal, None]: +async def get_root(**kwargs) -> AsyncGenerator[Portal, None]: + ''' + Deliver the current actor's "root process" actor (yes in actor + and proc tree terms) by delivering a `Portal` from the spawn-time + provided contact address. + ''' # TODO: rename mailbox to `_root_maddr` when we finally # add and impl libp2p multi-addrs? addr = _runtime_vars['_root_mailbox'] @@ -193,6 +196,11 @@ async def maybe_open_portal( addr: UnwrappedAddress, name: str, ): + ''' + Open a `Portal` to the actor serving @ `addr` or `None` if no + peer can be contacted or found. + + ''' async with query_actor( name=name, regaddr=addr, From 592d91839411619c34334f2a5a5cce8b13ae2888 Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 12 Feb 2026 18:17:45 -0500 Subject: [PATCH 12/18] Tweak `test_inter_peer_cancellation` for races Adjust `basic_echo_server()` default sequence len to avoid the race where the 'tell_little_bro()` finished streaming **before** the echo-server sub is cancelled by its peer subactor (which is the whole thing we're testing!). Deats, - bump `rng_seed` default from 50 -> 100 to ensure peer cancel req arrives before echo dialog completes on fast hw. - add `trio.sleep(0.001)` between send/receive in msg loop on the "client" streamer side to give cancel request transit more time to arrive. Also, - add more native `tractor`-type hints. - reflow `basic_echo_server()` doc-string for 67 char limit - add masked `pause()` call with comment about unreachable code path - alphabetize imports: mv `current_actor` and `open_nursery` below typed imports (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/test_inter_peer_cancellation.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 79c454f6..379f6fac 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -11,12 +11,13 @@ import trio import tractor from tractor import ( # typing Actor, - current_actor, - open_nursery, - Portal, Context, ContextCancelled, + MsgStream, + Portal, RemoteActorError, + current_actor, + open_nursery, ) from tractor._testing import ( # tractor_test, @@ -796,8 +797,8 @@ async def basic_echo_server( ) -> None: ''' - Just the simplest `MsgStream` echo server which resays what - you told it but with its uid in front ;) + Just the simplest `MsgStream` echo server which resays what you + told it but with its uid in front ;) ''' actor: Actor = tractor.current_actor() @@ -966,9 +967,14 @@ async def tell_little_bro( caller: str = '', err_after: float|None = None, - rng_seed: int = 50, + rng_seed: int = 100, + # NOTE, ensure ^ is large enough (on fast hw anyway) + # to ensure the peer cancel req arrives before the + # echoing dialog does itself Bp ): # contact target actor, do a stream dialog. + lb: Portal + echo_ipc: MsgStream async with ( tractor.wait_for_actor( name=actor_name @@ -983,7 +989,6 @@ async def tell_little_bro( else None ), ) as (sub_ctx, first), - sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() @@ -994,6 +999,7 @@ async def tell_little_bro( i, ) await echo_ipc.send(msg) + await trio.sleep(0.001) resp = await echo_ipc.receive() print( f'{caller} => {actor_name}: {msg}\n' @@ -1006,6 +1012,9 @@ async def tell_little_bro( assert sub_uid != uid assert _i == i + # XXX, usually should never get here! + # await tractor.pause() + @pytest.mark.parametrize( 'raise_client_error', From 9470815f5af3b8824d669538e0415045e630eeee Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 16:02:05 -0500 Subject: [PATCH 13/18] Fix `spawn` fixture cleanup + test assertions Improve the `spawn` fixture teardown logic in `tests/devx/conftest.py` fixing the while-else bug, and fix `test_advanced_faults` genexp for `TransportClosed` exc type checking. Deats, - replace broken `while-else` pattern with direct `if ptyproc.isalive()` check after the SIGINT loop. - fix undefined `spawned` ref -> `ptyproc.isalive()` in while condition. - improve walrus expr formatting in timeout check (multiline style). Also fix `test_ipc_channel_break_during_stream()` assertion, - wrap genexp in `all()` call so it actually checks all excs are `TransportClosed` instead of just creating an unused generator. (this patch was suggested by copilot in, https://github.com/goodboy/tractor/pull/411) (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/devx/conftest.py | 13 +++++++----- tests/test_advanced_faults.py | 6 ++++-- tractor/_rpc.py | 39 ++++++++--------------------------- 3 files changed, 21 insertions(+), 37 deletions(-) diff --git a/tests/devx/conftest.py b/tests/devx/conftest.py index a516eecd..efc74d44 100644 --- a/tests/devx/conftest.py +++ b/tests/devx/conftest.py @@ -101,15 +101,18 @@ def spawn( start: float = time.time() timeout: float = 5 while ( - spawned + ptyproc.isalive() and - spawned.isalive() - and - (_time_took := (time.time() - start) < timeout) + ( + (_time_took := (time.time() - start)) + < + timeout + ) ): ptyproc.kill(signal.SIGINT) time.sleep(0.01) - else: + + if ptyproc.isalive(): ptyproc.kill(signal.SIGKILL) # TODO? ensure we've cleaned up any UDS-paths? diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index cfd9cd8b..2103f627 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -255,8 +255,10 @@ def test_ipc_channel_break_during_stream( assert ( len(excs) <= 2 and - (isinstance(exc, TransportClosed) - for exc in excs) + all( + isinstance(exc, TransportClosed) + for exc in excs + ) ) final_exc = excs[0] assert isinstance(final_exc, expect_final_exc) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index bd26b5ed..e79bd8fc 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -284,9 +284,14 @@ async def _errors_relayed_via_ipc( try: yield # run RPC invoke body - except TransportClosed: - log.exception('Tpt disconnect during remote-exc relay?') - raise + # NOTE, never REPL any pseudo-expected tpt-disconnect. + except TransportClosed as err: + rpc_err = err + log.warning( + f'Tpt disconnect during remote-exc relay due to,\n' + f'{err!r}\n' + ) + raise err # box and ship RPC errors for wire-transit via # the task's requesting parent IPC-channel. @@ -323,9 +328,6 @@ async def _errors_relayed_via_ipc( and debug_kbis ) ) - # TODO? better then `debug_filter` below? - and - not isinstance(err, TransportClosed) ): # XXX QUESTION XXX: is there any case where we'll # want to debug IPC disconnects as a default? @@ -346,13 +348,6 @@ async def _errors_relayed_via_ipc( entered_debug = await debug._maybe_enter_pm( err, api_frame=inspect.currentframe(), - - # don't REPL any psuedo-expected tpt-disconnect - # debug_filter=lambda exc: ( - # type (exc) not in { - # TransportClosed, - # } - # ), ) if not entered_debug: # if we prolly should have entered the REPL but @@ -438,7 +433,7 @@ async def _errors_relayed_via_ipc( # cancel scope will not have been inserted yet if is_rpc: log.warning( - 'RPC task likely errored or cancelled before start?\n' + 'RPC task likely crashed or cancelled before start?\n' f'|_{ctx._task}\n' f' >> {ctx.repr_rpc}\n' ) @@ -694,22 +689,6 @@ async def _invoke( f'{pretty_struct.pformat(return_msg)}\n' ) await chan.send(return_msg) - # ?TODO, remove the below since .send() already - # doesn't raise on tpt-closed? - # try: - # await chan.send(return_msg) - # except TransportClosed: - # log.exception( - # f"Failed send final result to 'parent'-side of IPC-ctx!\n" - # f'\n' - # f'{chan}\n' - # f'Channel already disconnected ??\n' - # f'\n' - # f'{pretty_struct.pformat(return_msg)}' - # ) - # # ?TODO? will this ever be true though? - # if chan.connected(): - # raise # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, From d0b92bbebaa44aafbbcb410076d78673c28b244c Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 19 Feb 2026 16:18:39 -0500 Subject: [PATCH 14/18] Clean up `._transport` error-case comment Expand and clarify the comment for the default `case _` block in the `.send()` error matcher, noting that we console-error and raise-thru for unexpected disconnect conditions. (this patch was suggested by copilot in, https://github.com/goodboy/tractor/pull/411) (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/ipc/_transport.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 97ba3e5a..5078ae7d 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -485,9 +485,11 @@ class MsgpackTransport(MsgTransport): # await tractor.devx._trace.maybe_pause_bp() raise tpt_closed from trans_err - # unless the disconnect condition falls under "a - # normal operation breakage" we usualy console warn - # about it. + # XXX, unless the disconnect condition falls + # under "a normal/expected operating breakage" + # (per the `trans_err_msg` guards in the cases + # above) we usualy console-error about it and + # raise-thru. about it. case _: log.exception( f'{tpt_name} layer failed pre-send ??\n' From fa86269e30e86b6edb9aae5433bd64cc081c8946 Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 12 Feb 2026 00:51:17 -0500 Subject: [PATCH 15/18] Stuff from auto-review in https://github.com/goodboy/tractor/pull/412 .. --- tests/test_multi_program.py | 2 +- tractor/_root.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index eff9a731..17003e1c 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -163,7 +163,7 @@ def test_non_registrar_spawns_child( async with sub_ptl.open_context( get_root_portal, - ) as (ctx, first): + ) as (ctx, _): print('Waiting for `sub` to connect back to us..') await an.cancel() diff --git a/tractor/_root.py b/tractor/_root.py index f480a619..6589dacb 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -524,7 +524,7 @@ async def open_root_actor( # ?TODO, per-OS non-network-proto alt options? # -[ ] on linux we should be able to always use UDS? # - raddrs: list[Address] = _state._runtime_vars['_root_addrs'] + raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs'] raddrs.extend( accept_addrs, ) From 3e5124e1847bfb47361c700a5c2bda1a20f309a6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Sep 2025 12:35:35 -0400 Subject: [PATCH 16/18] Hide `._rpc._invoke()` frame, again.. --- tractor/_rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index e79bd8fc..f892854b 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -464,7 +464,7 @@ async def _invoke( kwargs: dict[str, Any], is_rpc: bool = True, - hide_tb: bool = False, + hide_tb: bool = True, return_msg_type: Return|CancelAck = Return, task_status: TaskStatus[ From 91f2f3ec10dc27f1e339c92b3890678ec58a99fe Mon Sep 17 00:00:00 2001 From: goodboy Date: Mon, 9 Feb 2026 13:50:50 -0500 Subject: [PATCH 17/18] Use test-harness `loglevel` in inter-peer suite --- tests/test_inter_peer_cancellation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 379f6fac..4df705b1 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -1029,6 +1029,9 @@ def test_peer_spawns_and_cancels_service_subactor( raise_client_error: str, reg_addr: tuple[str, int], raise_sub_spawn_error_after: float|None, + loglevel: str, + # ^XXX, set to 'warning' to see masked-exc warnings + # that may transpire during actor-nursery teardown. ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -1058,6 +1061,7 @@ def test_peer_spawns_and_cancels_service_subactor( # NOTE: to halt the peer tasks on ctxc, uncomment this. debug_mode=debug_mode, registry_addrs=[reg_addr], + loglevel=loglevel, ) as an: server: Portal = await an.start_actor( (server_name := 'spawn_server'), From 916f88a07037afec37746956bf7ccf5c30b3dcf7 Mon Sep 17 00:00:00 2001 From: goodboy Date: Mon, 9 Feb 2026 13:54:18 -0500 Subject: [PATCH 18/18] Less newlines in `._rpc` log msg --- tractor/_rpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index f892854b..ac658cb2 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -876,9 +876,9 @@ async def _invoke( ) logmeth( - f'{message}\n' + f'{message}' f'\n' - f'{descr_str}\n' + f'{descr_str}' )