From 3ab74988939bb6f753af179a7ab78361f0c3dfb5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Jul 2025 00:08:52 -0400 Subject: [PATCH 01/15] Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD --- tractor/ipc/_mp_bs.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tractor/ipc/_mp_bs.py b/tractor/ipc/_mp_bs.py index e51aa9ae..33a64a32 100644 --- a/tractor/ipc/_mp_bs.py +++ b/tractor/ipc/_mp_bs.py @@ -17,9 +17,16 @@ Utils to tame mp non-SC madeness ''' + +# !TODO! in 3.13 this can be disabled (the-same/similarly) using +# a flag, +# - [ ] soo if it works like this, drop this module entirely for +# 3.13+ B) +# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html +# def disable_mantracker(): ''' - Disable all ``multiprocessing``` "resource tracking" machinery since + Disable all `multiprocessing` "resource tracking" machinery since it's an absolute multi-threaded mess of non-SC madness. ''' From e875b62869c8dfa351536bf18757c9d2541039b8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jul 2025 11:07:50 -0400 Subject: [PATCH 02/15] Add `.ipc._shm` todo-idea for `@actor_fixture` API --- tractor/ipc/_shm.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tractor/ipc/_shm.py b/tractor/ipc/_shm.py index 62b26e79..ed17a2b7 100644 --- a/tractor/ipc/_shm.py +++ b/tractor/ipc/_shm.py @@ -789,6 +789,11 @@ def open_shm_list( readonly=readonly, ) + # TODO, factor into a @actor_fixture acm-API? + # -[ ] also `@maybe_actor_fixture()` which inludes + # the .current_actor() convenience check? + # |_ orr can that just be in the sin-maybe-version? + # # "close" attached shm on actor teardown try: actor = tractor.current_actor() From c0854fd221181555928d827ad6e22e01be6833d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 17:31:08 -0400 Subject: [PATCH 03/15] Set `Channel._cancel_called` via `chan` var In `Portal.cancel_actor()` that is, at the least to make it easier to ref search from an editor Bp --- tractor/_portal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 5729edd4..69133528 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -300,7 +300,7 @@ class Portal: ) # XXX the one spot we set it? - self.channel._cancel_called: bool = True + chan._cancel_called: bool = True try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with From 50f6543ee7cb2a13219c657b151a167cc43a844a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 17:32:42 -0400 Subject: [PATCH 04/15] Add `Channel.closed/.cancel_called` I.e. the public properties for the private instance var equivs; improves expected introspection usage. --- tractor/ipc/_chan.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 64643d95..9ddab8b0 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -101,11 +101,27 @@ class Channel: # ^XXX! ONLY set if a remote actor sends an `Error`-msg self._closed: bool = False - # flag set by ``Portal.cancel_actor()`` indicating remote - # (possibly peer) cancellation of the far end actor - # runtime. + # flag set by `Portal.cancel_actor()` indicating remote + # (possibly peer) cancellation of the far end actor runtime. self._cancel_called: bool = False + @property + def closed(self) -> bool: + ''' + Was `.aclose()` successfully called? + + ''' + return self._closed + + @property + def cancel_called(self) -> bool: + ''' + Set when `Portal.cancel_actor()` is called on a portal which + wraps this IPC channel. + + ''' + return self._cancel_called + @property def uid(self) -> tuple[str, str]: ''' From 599020c2c5e2a5d2e556a5554f669ff39c771300 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Jul 2025 20:07:35 -0400 Subject: [PATCH 05/15] Rename all lingering ctx-side bits As before but more thoroughly in comments and var names finally changing all, - caller -> parent - callee -> child --- tractor/_context.py | 103 +++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 49 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 7a2bf5c6..0e3ff0c3 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -154,7 +154,7 @@ class Context: 2 cancel-scope-linked, communicating and parallel executing `Task`s. Contexts are allocated on each side of any task RPC-linked msg dialog, i.e. for every request to a remote - actor from a `Portal`. On the "callee" side a context is + actor from a `Portal`. On the "child" side a context is always allocated inside `._rpc._invoke()`. TODO: more detailed writeup on cancellation, error and @@ -222,8 +222,8 @@ class Context: # `._runtime.invoke()`. _remote_func_type: str | None = None - # NOTE: (for now) only set (a portal) on the caller side since - # the callee doesn't generally need a ref to one and should + # NOTE: (for now) only set (a portal) on the parent side since + # the child doesn't generally need a ref to one and should # normally need to explicitly ask for handle to its peer if # more the the `Context` is needed? _portal: Portal | None = None @@ -252,12 +252,12 @@ class Context: _outcome_msg: Return|Error|ContextCancelled = Unresolved # on a clean exit there should be a final value - # delivered from the far end "callee" task, so + # delivered from the far end "child" task, so # this value is only set on one side. # _result: Any | int = None _result: PayloadT|Unresolved = Unresolved - # if the local "caller" task errors this value is always set + # if the local "parent" task errors this value is always set # to the error that was captured in the # `Portal.open_context().__aexit__()` teardown block OR, in # 2 special cases when an (maybe) expected remote error @@ -293,7 +293,7 @@ class Context: # a `ContextCancelled` due to a call to `.cancel()` triggering # "graceful closure" on either side: # - `._runtime._invoke()` will check this flag before engaging - # the crash handler REPL in such cases where the "callee" + # the crash handler REPL in such cases where the "child" # raises the cancellation, # - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if # the global tty-lock has been configured to filter out some @@ -307,8 +307,8 @@ class Context: _stream_opened: bool = False _stream: MsgStream|None = None - # caller of `Portal.open_context()` for - # logging purposes mostly + # the parent-task's calling-fn's frame-info, the frame above + # `Portal.open_context()`, for introspection/logging. _caller_info: CallerInfo|None = None # overrun handling machinery @@ -529,11 +529,11 @@ class Context: ''' Exactly the value of `self._scope.cancelled_caught` (delegation) and should only be (able to be read as) - `True` for a `.side == "caller"` ctx wherein the + `True` for a `.side == "parent"` ctx wherein the `Portal.open_context()` block was exited due to a call to `._scope.cancel()` - which should only ocurr in 2 cases: - - a caller side calls `.cancel()`, the far side cancels + - a parent side calls `.cancel()`, the far side cancels and delivers back a `ContextCancelled` (making `.cancel_acked == True`) and `._scope.cancel()` is called by `._maybe_cancel_and_set_remote_error()` which @@ -542,20 +542,20 @@ class Context: => `._scope.cancelled_caught == True` by normal `trio` cs semantics. - - a caller side is delivered a `._remote_error: + - a parent side is delivered a `._remote_error: RemoteActorError` via `._deliver_msg()` and a transitive call to `_maybe_cancel_and_set_remote_error()` calls `._scope.cancel()` and that cancellation eventually results in `trio.Cancelled`(s) caught in the `.open_context()` handling around the @acm's `yield`. - Only as an FYI, in the "callee" side case it can also be + Only as an FYI, in the "child" side case it can also be set but never is readable by any task outside the RPC machinery in `._invoke()` since,: - - when a callee side calls `.cancel()`, `._scope.cancel()` + - when a child side calls `.cancel()`, `._scope.cancel()` is called immediately and handled specially inside `._invoke()` to raise a `ContextCancelled` which is then - sent to the caller side. + sent to the parent side. However, `._scope.cancelled_caught` can NEVER be accessed/read as `True` by any RPC invoked task since it @@ -666,7 +666,7 @@ class Context: when called/closed by actor local task(s). NOTEs: - - It is expected that the caller has previously unwrapped + - It is expected that the parent has previously unwrapped the remote error using a call to `unpack_error()` and provides that output exception value as the input `error` argument *here*. @@ -676,7 +676,7 @@ class Context: `Portal.open_context()` (ideally) we want to interrupt any ongoing local tasks operating within that `Context`'s cancel-scope so as to be notified ASAP of - the remote error and engage any caller handling (eg. + the remote error and engage any parent handling (eg. for cross-process task supervision). - In some cases we may want to raise the remote error @@ -886,6 +886,11 @@ class Context: @property def repr_caller(self) -> str: + ''' + Render a "namespace-path" style representation of the calling + task-fn. + + ''' ci: CallerInfo|None = self._caller_info if ci: return ( @@ -899,7 +904,7 @@ class Context: def repr_api(self) -> str: return 'Portal.open_context()' - # TODO: use `.dev._frame_stack` scanning to find caller! + # TODO: use `.dev._frame_stack` scanning to find caller fn! # ci: CallerInfo|None = self._caller_info # if ci: # return ( @@ -934,7 +939,7 @@ class Context: => That is, an IPC `Context` (this) **does not** have the same semantics as a `trio.CancelScope`. - If the caller (who entered the `Portal.open_context()`) + If the parent (who entered the `Portal.open_context()`) desires that the internal block's cancel-scope be cancelled it should open its own `trio.CancelScope` and manage it as needed. @@ -1017,7 +1022,7 @@ class Context: # `_invoke()` RPC task. # # NOTE: on this side we ALWAYS cancel the local scope - # since the caller expects a `ContextCancelled` to be sent + # since the parent expects a `ContextCancelled` to be sent # from `._runtime._invoke()` back to the other side. The # logic for catching the result of the below # `._scope.cancel()` is inside the `._runtime._invoke()` @@ -1190,8 +1195,8 @@ class Context: ) -> Any|Exception: ''' - From some (caller) side task, wait for and return the final - result from the remote (callee) side's task. + From some (parent) side task, wait for and return the final + result from the remote (child) side's task. This provides a mechanism for one task running in some actor to wait on another task at the other side, in some other actor, to terminate. @@ -1600,7 +1605,7 @@ class Context: raise err # TODO: maybe a flag to by-pass encode op if already done - # here in caller? + # here in parent? await self.chan.send(started_msg) # set msg-related internal runtime-state @@ -1676,7 +1681,7 @@ class Context: XXX RULES XXX ------ - ------ - - NEVER raise remote errors from this method; a runtime task caller. + - NEVER raise remote errors from this method; a calling runtime-task. An error "delivered" to a ctx should always be raised by the corresponding local task operating on the `Portal`/`Context` APIs. @@ -1752,7 +1757,7 @@ class Context: else: report = ( - 'Queueing OVERRUN msg on caller task:\n\n' + 'Queueing OVERRUN msg on parent task:\n\n' + report ) log.debug(report) @@ -1948,12 +1953,12 @@ async def open_context_from_portal( IPC protocol. The yielded `tuple` is a pair delivering a `tractor.Context` - and any first value "sent" by the "callee" task via a call + and any first value "sent" by the "child" task via a call to `Context.started()`; this side of the - context does not unblock until the "callee" task calls + context does not unblock until the "child" task calls `.started()` in similar style to `trio.Nursery.start()`. - When the "callee" (side that is "called"/started by a call - to *this* method) returns, the caller side (this) unblocks + When the "child" (side that is "called"/started by a call + to *this* method) returns, the parent side (this) unblocks and any final value delivered from the other end can be retrieved using the `Contex.wait_for_result()` api. @@ -1966,7 +1971,7 @@ async def open_context_from_portal( __tracebackhide__: bool = hide_tb # denote this frame as a "runtime frame" for stack - # introspection where we report the caller code in logging + # introspection where we report the parent code in logging # and error message content. # NOTE: 2 bc of the wrapping `@acm` __runtimeframe__: int = 2 # noqa @@ -2025,7 +2030,7 @@ async def open_context_from_portal( # placeholder for any exception raised in the runtime # or by user tasks which cause this context's closure. scope_err: BaseException|None = None - ctxc_from_callee: ContextCancelled|None = None + ctxc_from_child: ContextCancelled|None = None try: async with ( collapse_eg(), @@ -2104,7 +2109,7 @@ async def open_context_from_portal( # that we can re-use it around the `yield` ^ here # or vice versa? # - # maybe TODO NOTE: between the caller exiting and + # maybe TODO NOTE: between the parent exiting and # arriving here the far end may have sent a ctxc-msg or # other error, so the quetion is whether we should check # for it here immediately and maybe raise so as to engage @@ -2170,16 +2175,16 @@ async def open_context_from_portal( # request in which case we DO let the error bubble to the # opener. # - # 2-THIS "caller" task somewhere invoked `Context.cancel()` - # and received a `ContextCanclled` from the "callee" + # 2-THIS "parent" task somewhere invoked `Context.cancel()` + # and received a `ContextCanclled` from the "child" # task, in which case we mask the `ContextCancelled` from - # bubbling to this "caller" (much like how `trio.Nursery` + # bubbling to this "parent" (much like how `trio.Nursery` # swallows any `trio.Cancelled` bubbled by a call to # `Nursery.cancel_scope.cancel()`) except ContextCancelled as ctxc: scope_err = ctxc ctx._local_error: BaseException = scope_err - ctxc_from_callee = ctxc + ctxc_from_child = ctxc # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # using this code and then resuming the REPL will @@ -2216,11 +2221,11 @@ async def open_context_from_portal( # the above `._scope` can be cancelled due to: # 1. an explicit self cancel via `Context.cancel()` or # `Actor.cancel()`, - # 2. any "callee"-side remote error, possibly also a cancellation + # 2. any "child"-side remote error, possibly also a cancellation # request by some peer, - # 3. any "caller" (aka THIS scope's) local error raised in the above `yield` + # 3. any "parent" (aka THIS scope's) local error raised in the above `yield` except ( - # CASE 3: standard local error in this caller/yieldee + # CASE 3: standard local error in this parent/yieldee Exception, # CASES 1 & 2: can manifest as a `ctx._scope_nursery` @@ -2234,9 +2239,9 @@ async def open_context_from_portal( # any `Context._maybe_raise_remote_err()` call. # # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` - # from any error delivered from the "callee" side + # from any error delivered from the "child" side # AND a group-exc is only raised if there was > 1 - # tasks started *here* in the "caller" / opener + # tasks started *here* in the "parent" / opener # block. If any one of those tasks calls # `.wait_for_result()` or `MsgStream.receive()` # `._maybe_raise_remote_err()` will be transitively @@ -2249,8 +2254,8 @@ async def open_context_from_portal( trio.Cancelled, # NOTE: NOT from inside the ctx._scope KeyboardInterrupt, - ) as caller_err: - scope_err = caller_err + ) as rent_err: + scope_err = rent_err ctx._local_error: BaseException = scope_err # XXX: ALWAYS request the context to CANCEL ON any ERROR. @@ -2289,9 +2294,9 @@ async def open_context_from_portal( 'Calling `ctx.cancel()`!\n' ) - # we don't need to cancel the callee if it already + # we don't need to cancel the child if it already # told us it's cancelled ;p - if ctxc_from_callee is None: + if ctxc_from_child is None: try: await ctx.cancel() except ( @@ -2322,8 +2327,8 @@ async def open_context_from_portal( # via a call to # `Context._maybe_cancel_and_set_remote_error()`. # As per `Context._deliver_msg()`, that error IS - # ALWAYS SET any time "callee" side fails and causes "caller - # side" cancellation via a `ContextCancelled` here. + # ALWAYS SET any time "child" side fails and causes + # "parent side" cancellation via a `ContextCancelled` here. try: result_or_err: Exception|Any = await ctx.wait_for_result() except BaseException as berr: @@ -2359,7 +2364,7 @@ async def open_context_from_portal( ) case (None, _): log.runtime( - 'Context returned final result from callee task:\n' + 'Context returned final result from child task:\n' f'<= peer: {uid}\n' f' |_ {nsf}()\n\n' @@ -2454,7 +2459,7 @@ async def open_context_from_portal( ) # TODO: should we add a `._cancel_req_received` - # flag to determine if the callee manually called + # flag to determine if the child manually called # `ctx.cancel()`? # -[ ] going to need a cid check no? @@ -2510,7 +2515,7 @@ def mk_context( recv_chan: trio.MemoryReceiveChannel send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) - # TODO: only scan caller-info if log level so high! + # TODO: only scan parent-info if log level so high! from .devx._frame_stack import find_caller_info caller_info: CallerInfo|None = find_caller_info() From 3c30c559d5de712092d5951bdfc14b9e90e5303d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Jul 2025 23:16:30 -0400 Subject: [PATCH 06/15] `ipc._uds`: assign `.l/raddr` in `.connect_to()` Using `.get_stream_addrs()` such that we always (*can*) assign the peer end's PID in the `._raddr`. Also factor common `ConnectionError` re-raising into a `_reraise_as_connerr()`-@cm. --- tractor/ipc/_uds.py | 63 ++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index 604802f3..645819f0 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -18,6 +18,9 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco ''' from __future__ import annotations +from contextlib import ( + contextmanager as cm, +) from pathlib import Path import os from socket import ( @@ -29,6 +32,7 @@ from socket import ( ) import struct from typing import ( + Type, TYPE_CHECKING, ClassVar, ) @@ -205,6 +209,22 @@ class UDSAddress( f']' ) +@cm +def _reraise_as_connerr( + src_excs: tuple[Type[Exception]], + addr: UDSAddress, +): + try: + yield + except src_excs as src_exc: + raise ConnectionError( + f'Bad UDS socket-filepath-as-address ??\n' + f'{addr}\n' + f' |_sockpath: {addr.sockpath}\n' + f'\n' + f'from src: {src_exc!r}\n' + ) from src_exc + async def start_listener( addr: UDSAddress, @@ -222,16 +242,14 @@ async def start_listener( ) bindpath: Path = addr.sockpath - try: + with _reraise_as_connerr( + src_excs=( + FileNotFoundError, + OSError, + ), + addr=addr + ): await sock.bind(str(bindpath)) - except ( - FileNotFoundError, - ) as fdne: - raise ConnectionError( - f'Bad UDS socket-filepath-as-address ??\n' - f'{addr}\n' - f' |_sockpath: {addr.sockpath}\n' - ) from fdne sock.listen(1) log.info( @@ -356,27 +374,30 @@ class MsgpackUDSStream(MsgpackTransport): # `.setsockopt()` call tells the OS provide it; the client # pid can then be read on server/listen() side via # `get_peer_info()` above. - try: + + with _reraise_as_connerr( + src_excs=( + FileNotFoundError, + ), + addr=addr + ): stream = await open_unix_socket_w_passcred( str(sockpath), **kwargs ) - except ( - FileNotFoundError, - ) as fdne: - raise ConnectionError( - f'Bad UDS socket-filepath-as-address ??\n' - f'{addr}\n' - f' |_sockpath: {sockpath}\n' - ) from fdne - stream = MsgpackUDSStream( + tpt_stream = MsgpackUDSStream( stream, prefix_size=prefix_size, codec=codec ) - stream._raddr = addr - return stream + # XXX assign from new addrs after peer-PID extract! + ( + tpt_stream._laddr, + tpt_stream._raddr, + ) = cls.get_stream_addrs(stream) + + return tpt_stream @classmethod def get_stream_addrs( From 1d706bddda27a35ea826edc737ea942f5e1a3e07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jul 2025 11:27:30 -0400 Subject: [PATCH 07/15] Rm `assert` from `Channel.from_addr()`, for UDS we re-created to extract the peer PID --- tractor/ipc/_chan.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 9ddab8b0..dcb0d6ad 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -185,7 +185,9 @@ class Channel: addr, **kwargs, ) - assert transport.raddr == addr + # XXX, for UDS *no!* since we recv the peer-pid and build out + # a new addr.. + # assert transport.raddr == addr chan = Channel(transport=transport) # ?TODO, compact this into adapter level-methods? @@ -301,7 +303,7 @@ class Channel: self, payload: Any, - hide_tb: bool = True, + hide_tb: bool = False, ) -> None: ''' From 00112edd58ab4c9c7e8282af003effedb48610c8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jul 2025 13:32:23 -0400 Subject: [PATCH 08/15] UDS: implicitly create `Address.bindspace: Path` Since it's merely a local-file-sys subdirectory and there should be no reason file creation conflicts with other bind spaces. Also add 2 test suites to match, - `tests/ipc/test_each_tpt::test_uds_bindspace_created_implicitly` to verify the dir creation when DNE. - `..test_uds_double_listen_raises_connerr` to ensure a double bind raises a `ConnectionError` from the src `OSError`. --- tests/ipc/test_each_tpt.py | 113 +++++++++++++++++++++++++++++++++++++ tractor/ipc/_uds.py | 21 ++++++- 2 files changed, 131 insertions(+), 3 deletions(-) create mode 100644 tests/ipc/test_each_tpt.py diff --git a/tests/ipc/test_each_tpt.py b/tests/ipc/test_each_tpt.py new file mode 100644 index 00000000..7dc60444 --- /dev/null +++ b/tests/ipc/test_each_tpt.py @@ -0,0 +1,113 @@ +''' +Unit-ish tests for specific IPC transport protocol backends. + +''' +from __future__ import annotations +from pathlib import Path + +import pytest +import trio +import tractor +from tractor import ( + Actor, + _state, + _addr, +) + + +@pytest.fixture +def bindspace_dir_str() -> str: + + bs_dir_str: str = '/run/user/1000/doggy' + bs_dir = Path(bs_dir_str) + assert not bs_dir.is_dir() + + yield bs_dir_str + + # delete it on suite teardown. + # ?TODO? should we support this internally + # or is leaking it ok? + if bs_dir.is_dir(): + bs_dir.rmdir() + + +def test_uds_bindspace_created_implicitly( + debug_mode: bool, + bindspace_dir_str: str, +): + registry_addr: tuple = ( + f'{bindspace_dir_str}', + 'registry@doggy.sock', + ) + bs_dir_str: str = registry_addr[0] + + # XXX, ensure bindspace-dir DNE beforehand! + assert not Path(bs_dir_str).is_dir() + + async def main(): + async with tractor.open_nursery( + enable_transports=['uds'], + registry_addrs=[registry_addr], + debug_mode=debug_mode, + ) as _an: + + # XXX MUST be created implicitly by + # `.ipc._uds.start_listener()`! + assert Path(bs_dir_str).is_dir() + + root: Actor = tractor.current_actor() + assert root.is_registrar + + assert registry_addr in root.reg_addrs + assert ( + registry_addr + in + _state._runtime_vars['_registry_addrs'] + ) + assert ( + _addr.wrap_address(registry_addr) + in + root.registry_addrs + ) + + trio.run(main) + + +def test_uds_double_listen_raises_connerr( + debug_mode: bool, + bindspace_dir_str: str, +): + registry_addr: tuple = ( + f'{bindspace_dir_str}', + 'registry@doggy.sock', + ) + + async def main(): + async with tractor.open_nursery( + enable_transports=['uds'], + registry_addrs=[registry_addr], + debug_mode=debug_mode, + ) as _an: + + # runtime up + root: Actor = tractor.current_actor() + + from tractor.ipc._uds import ( + start_listener, + UDSAddress, + ) + ya_bound_addr: UDSAddress = root.registry_addrs[0] + try: + await start_listener( + addr=ya_bound_addr, + ) + except ConnectionError as connerr: + assert type(src_exc := connerr.__context__) is OSError + assert 'Address already in use' in src_exc.args + # complete, exit test. + + else: + pytest.fail('It dint raise a connerr !?') + + + trio.run(main) diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index 645819f0..e23fd8d2 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -103,8 +103,6 @@ class UDSAddress( self.filedir or self.def_bindspace - # or - # get_rt_dir() ) @property @@ -230,7 +228,14 @@ async def start_listener( addr: UDSAddress, **kwargs, ) -> SocketListener: - # sock = addr._sock = socket.socket( + ''' + Start listening for inbound connections via + a `trio.SocketListener` (task) which `socket.bind()`s on `addr`. + + Note, if the `UDSAddress.bindspace: Path` directory dne it is + implicitly created. + + ''' sock = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM @@ -241,7 +246,17 @@ async def start_listener( f'|_{addr}\n' ) + # ?TODO? should we use the `actor.lifetime_stack` + # to rm on shutdown? bindpath: Path = addr.sockpath + if not (bs := addr.bindspace).is_dir(): + log.info( + 'Creating bindspace dir in file-sys\n' + f'>{{\n' + f'|_{bs!r}\n' + ) + bs.mkdir() + with _reraise_as_connerr( src_excs=( FileNotFoundError, From ba08052ddf7008815be2f6024772ba78e7836d8b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 14:58:18 -0400 Subject: [PATCH 09/15] Handle "out-of-layer" remote `Context` cancellation Such that if the local task hasn't resolved but is `trio.Cancelled` and a `.canceller` was set, we report a `'actor-cancelled'` from `.repr_state: str`. Bit of formatting to avoid needless newlines too! --- tractor/_context.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 0e3ff0c3..9e277a88 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -1011,7 +1011,6 @@ class Context: else: log.cancel( f'Timed out on cancel request of remote task?\n' - f'\n' f'{reminfo}' ) @@ -1492,6 +1491,12 @@ class Context: ): status = 'peer-cancelled' + case ( + Unresolved, + trio.Cancelled(), # any error-type + ) if self.canceller: + status = 'actor-cancelled' + # (remote) error condition case ( Unresolved, @@ -2273,7 +2278,7 @@ async def open_context_from_portal( logmeth = log.exception logmeth( - f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n' + f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' ) if debug_mode(): From 5931c59aefd3bb69add3464215a206b56fe2164d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 15:01:47 -0400 Subject: [PATCH 10/15] Log "out-of-layer" cancellation in `._rpc._invoke()` Similar to what was just changed for `Context.repr_state`, when the child task is cancelled but by a different "layer" of the runtime (i.e. a `Portal.cancel_actor()` / `SIGINT`-to-process canceller) we don't dump a traceback instead just `log.cancel()` emit. --- tractor/_rpc.py | 42 +++++++++++++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index eb1df2cc..5aee986d 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -642,7 +642,7 @@ async def _invoke( tn: Nursery rpc_ctx_cs: CancelScope async with ( - collapse_eg(), + collapse_eg(hide_tb=False), trio.open_nursery() as tn, msgops.maybe_limit_plds( ctx=ctx, @@ -823,24 +823,44 @@ async def _invoke( f'after having {ctx.repr_state!r}\n' ) if merr: - logmeth: Callable = log.error - if isinstance(merr, ContextCancelled): - logmeth: Callable = log.runtime + if ( + # ctxc: by `Context.cancel()` + isinstance(merr, ContextCancelled) - if not isinstance(merr, RemoteActorError): - tb_str: str = ''.join(traceback.format_exception(merr)) + # out-of-layer cancellation, one of: + # - actorc: by `Portal.cancel_actor()` + # - OSc: by SIGINT or `Process.signal()` + or ( + isinstance(merr, trio.Cancelled) + and + ctx.canceller + ) + ): + logmeth: Callable = log.cancel + descr_str += ( + f' with {merr!r}\n' + ) + + elif ( + not isinstance(merr, RemoteActorError) + ): + tb_str: str = ''.join( + traceback.format_exception(merr) + ) descr_str += ( f'\n{merr!r}\n' # needed? f'{tb_str}\n' - f'\n' - f'scope_error:\n' - f'{scope_err!r}\n' ) else: - descr_str += f'\n{merr!r}\n' + descr_str += ( + f'{merr!r}\n' + ) else: - descr_str += f'\nwith final result {ctx.outcome!r}\n' + descr_str += ( + f'\n' + f'with final result {ctx.outcome!r}\n' + ) logmeth( f'{message}\n' From a72d1e6c4836c1cf432a444826696de7464b4782 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 15:07:43 -0400 Subject: [PATCH 11/15] Multi-line-style up the UDS fast-connect handler Shift around comments and expressions for better reading, assign `tpt_closed` for easier introspection from REPL during debug oh and fix the `MsgpackTransport.pformat()` to render '|_peers: 1' .. XD --- tractor/ipc/_transport.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 6bfa5f6a..ad2a0e8e 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -430,20 +430,24 @@ class MsgpackTransport(MsgTransport): return await self.stream.send_all(size + bytes_data) except ( trio.BrokenResourceError, - ) as bre: - trans_err = bre + ) as _re: + trans_err = _re tpt_name: str = f'{type(self).__name__!r}' + match trans_err: + + # XXX, specifc to UDS transport and its, + # well, "speediness".. XD + # |_ likely todo with races related to how fast + # the socket is setup/torn-down on linux + # as it pertains to rando pings from the + # `.discovery` subsys and protos. case trio.BrokenResourceError() if ( - '[Errno 32] Broken pipe' in trans_err.args[0] - # ^XXX, specifc to UDS transport and its, - # well, "speediness".. XD - # |_ likely todo with races related to how fast - # the socket is setup/torn-down on linux - # as it pertains to rando pings from the - # `.discovery` subsys and protos. + '[Errno 32] Broken pipe' + in + trans_err.args[0] ): - raise TransportClosed.from_src_exc( + tpt_closed = TransportClosed.from_src_exc( message=( f'{tpt_name} already closed by peer\n' ), @@ -451,14 +455,15 @@ class MsgpackTransport(MsgTransport): src_exc=trans_err, raise_on_report=True, loglevel='transport', - ) from bre + ) + raise tpt_closed from trans_err # unless the disconnect condition falls under "a # normal operation breakage" we usualy console warn # about it. case _: log.exception( - '{tpt_name} layer failed pre-send ??\n' + f'{tpt_name} layer failed pre-send ??\n' ) raise trans_err @@ -503,7 +508,7 @@ class MsgpackTransport(MsgTransport): def pformat(self) -> str: return ( f'<{type(self).__name__}(\n' - f' |_peers: 2\n' + f' |_peers: 1\n' f' laddr: {self._laddr}\n' f' raddr: {self._raddr}\n' # f'\n' From df0d00abf4f34c780d619c9b6f52268989d47ada Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Aug 2025 10:29:56 -0400 Subject: [PATCH 12/15] Translate CRE's due to socket-close to tpt-closed Just like in the BRE case (for UDS) it seems when a peer closes the (UDS?) socket `trio` instead raises a `ClosedResourceError` which we now catch and re-raise as a `TransportClosed`. This again results in `tpt.send()` calls from the rpc-runtime **not** raising when it's known that the IPC channel is disconnected. --- tractor/ipc/_transport.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index ad2a0e8e..5a484b36 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -430,6 +430,7 @@ class MsgpackTransport(MsgTransport): return await self.stream.send_all(size + bytes_data) except ( trio.BrokenResourceError, + trio.ClosedResourceError, ) as _re: trans_err = _re tpt_name: str = f'{type(self).__name__!r}' @@ -458,6 +459,22 @@ class MsgpackTransport(MsgTransport): ) raise tpt_closed from trans_err + case trio.ClosedResourceError() if ( + 'this socket was already closed' + in + trans_err.args[0] + ): + tpt_closed = TransportClosed.from_src_exc( + message=( + f'{tpt_name} already closed by peer\n' + ), + body=f'{self}\n', + src_exc=trans_err, + raise_on_report=True, + loglevel='transport', + ) + raise tpt_closed from trans_err + # unless the disconnect condition falls under "a # normal operation breakage" we usualy console warn # about it. From 331921f612b58de6fee930ce4474e77ee857dc73 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Aug 2025 19:16:33 -0400 Subject: [PATCH 13/15] Hmm disable CRE case for now, causes test fails So i need to either adjust the tests or figure out if/why this is needed to avoid the crashing in `pikerd` i found when killin the chart during a long backfill with `binance` backend.. --- tractor/ipc/_transport.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 5a484b36..8c76c8ad 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -459,21 +459,21 @@ class MsgpackTransport(MsgTransport): ) raise tpt_closed from trans_err - case trio.ClosedResourceError() if ( - 'this socket was already closed' - in - trans_err.args[0] - ): - tpt_closed = TransportClosed.from_src_exc( - message=( - f'{tpt_name} already closed by peer\n' - ), - body=f'{self}\n', - src_exc=trans_err, - raise_on_report=True, - loglevel='transport', - ) - raise tpt_closed from trans_err + # case trio.ClosedResourceError() if ( + # 'this socket was already closed' + # in + # trans_err.args[0] + # ): + # tpt_closed = TransportClosed.from_src_exc( + # message=( + # f'{tpt_name} already closed by peer\n' + # ), + # body=f'{self}\n', + # src_exc=trans_err, + # raise_on_report=True, + # loglevel='transport', + # ) + # raise tpt_closed from trans_err # unless the disconnect condition falls under "a # normal operation breakage" we usualy console warn From 79f502034f0f537fba00172ccadc0a1932f22bbd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Aug 2025 20:27:03 -0400 Subject: [PATCH 14/15] Don't hard code runtime-dir, read it with `._state.get_rt_dir()` --- tests/ipc/test_each_tpt.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/ipc/test_each_tpt.py b/tests/ipc/test_each_tpt.py index 7dc60444..9ed45789 100644 --- a/tests/ipc/test_each_tpt.py +++ b/tests/ipc/test_each_tpt.py @@ -18,8 +18,9 @@ from tractor import ( @pytest.fixture def bindspace_dir_str() -> str: - bs_dir_str: str = '/run/user/1000/doggy' - bs_dir = Path(bs_dir_str) + rt_dir: Path = tractor._state.get_rt_dir() + bs_dir: Path = rt_dir / 'doggy' + bs_dir_str: str = str(bs_dir) assert not bs_dir.is_dir() yield bs_dir_str From 5021514a6ae3541c71ce145e8c38b314a424b6be Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Aug 2025 22:01:15 -0400 Subject: [PATCH 15/15] Disable shm resource tracker via flag on 3.13+ As per the newly added support, https://docs.python.org/3/library/multiprocessing.shared_memory.html --- tractor/ipc/_mp_bs.py | 67 +++++++++++++++++++++++++++++-------------- tractor/ipc/_shm.py | 22 +++++++++----- 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/tractor/ipc/_mp_bs.py b/tractor/ipc/_mp_bs.py index 33a64a32..462291c6 100644 --- a/tractor/ipc/_mp_bs.py +++ b/tractor/ipc/_mp_bs.py @@ -17,36 +17,59 @@ Utils to tame mp non-SC madeness ''' +import platform + -# !TODO! in 3.13 this can be disabled (the-same/similarly) using -# a flag, -# - [ ] soo if it works like this, drop this module entirely for -# 3.13+ B) -# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html -# def disable_mantracker(): ''' Disable all `multiprocessing` "resource tracking" machinery since it's an absolute multi-threaded mess of non-SC madness. ''' - from multiprocessing import resource_tracker as mantracker + from multiprocessing.shared_memory import SharedMemory - # Tell the "resource tracker" thing to fuck off. - class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass - def unregister(self, name, rtype): - pass + # 3.13+ only.. can pass `track=False` to disable + # all the resource tracker bs. + # https://docs.python.org/3/library/multiprocessing.shared_memory.html + if (_py_313 := ( + platform.python_version_tuple()[:-1] + >= + ('3', '13') + ) + ): + from functools import partial + return partial( + SharedMemory, + track=False, + ) - def ensure_running(self): - pass + # !TODO, once we drop 3.12- we can obvi remove all this! + else: + from multiprocessing import ( + resource_tracker as mantracker, + ) - # "know your land and know your prey" - # https://www.dailymotion.com/video/x6ozzco - mantracker._resource_tracker = ManTracker() - mantracker.register = mantracker._resource_tracker.register - mantracker.ensure_running = mantracker._resource_tracker.ensure_running - mantracker.unregister = mantracker._resource_tracker.unregister - mantracker.getfd = mantracker._resource_tracker.getfd + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd + + # use std type verbatim + shmT = SharedMemory + + return shmT diff --git a/tractor/ipc/_shm.py b/tractor/ipc/_shm.py index ed17a2b7..2360f893 100644 --- a/tractor/ipc/_shm.py +++ b/tractor/ipc/_shm.py @@ -23,14 +23,15 @@ considered optional within the context of this runtime-library. """ from __future__ import annotations +from multiprocessing import shared_memory as shm +from multiprocessing.shared_memory import ( + # SharedMemory, + ShareableList, +) +import platform from sys import byteorder import time from typing import Optional -from multiprocessing import shared_memory as shm -from multiprocessing.shared_memory import ( - SharedMemory, - ShareableList, -) from msgspec import ( Struct, @@ -61,7 +62,7 @@ except ImportError: log = get_logger(__name__) -disable_mantracker() +SharedMemory = disable_mantracker() class SharedInt: @@ -797,8 +798,15 @@ def open_shm_list( # "close" attached shm on actor teardown try: actor = tractor.current_actor() + actor.lifetime_stack.callback(shml.shm.close) - actor.lifetime_stack.callback(shml.shm.unlink) + + # XXX on 3.13+ we don't need to call this? + # -> bc we pass `track=False` for `SharedMemeory` orr? + if ( + platform.python_version_tuple()[:-1] < ('3', '13') + ): + actor.lifetime_stack.callback(shml.shm.unlink) except RuntimeError: log.warning('tractor runtime not active, skipping teardown steps')