Compare commits

..

120 Commits

Author SHA1 Message Date
Tyler Goodlet 490a4cf8c8 Reapply `.devx.debug` mod-name change to ipc-server lost during rebase.. 2025-07-13 15:55:39 -04:00
Tyler Goodlet 52be5c08e4 Drop "
" from tail of `BoxedMaybeException.pformat()`
2025-07-13 15:55:39 -04:00
Tyler Goodlet c22a301883 Drop `.to_asyncio`s usage-of-`greenback`-reporting to `log.devx()` 2025-07-13 15:55:39 -04:00
Tyler Goodlet 05ec9eaef7 Disable `greenback` sync fn breakpointing by def
Opting for performance over broad multi-actor "debug-ability" from
sync-function-contexts when `debug_mode=True` is set;
IOW prefer no behind-the-scenes `greenlet` perf impact over being
able to use an actor-safe `breakpoint()` wherever as per,
https://greenback.readthedocs.io/en/latest/principle.html#performance

Adjust the breakpoint restore ex script to match.
2025-07-13 15:55:39 -04:00
Tyler Goodlet f96fb8f332 Prevent `test_breakpoint_hook_restored` subproc hangs
If the underlying example script fails (say due to a console output
pattern-mismatch, `AssertionError`) the `pexpect` managed subproc with
a `debug_mode=True` crash-handling-REPL engaged will ofc *not terminate*
due to any SIGINT sent by the test harnesss (since we shield from it as
part of normal sub-actor debugger operation). So instead always send
a 'continue' cmd to the active `PdbREPL`'s stdin so it deactivates and
allows the py-script-process to raise and terminate, unblocking the
`pexpect.spawn`'s internal subproc joiner (which would otherwise hang
without manual intervention, blocking downstream tests..).

Also, use the new `PexpectSpawner` type alias after actually importing
future annots.. XD
2025-07-13 15:55:39 -04:00
Tyler Goodlet 91222c1087 Type alias our `pexpect.spawn()` closure fixture
Such that we can more easily annotate any consumer test's of our
`.tests.devx.conftest.spawn()` fixture which delivers a closure which, when
called in a test fn body, transitively sub-invokes:
`pytest.Pytester.spawn()` -> `pexpect.spawn()`

IMO Expecting `Callable[[str], pexpect.pty_spawn.spawn]]` to be used all
over is a bit too.. verbose?
2025-07-13 15:55:39 -04:00
Tyler Goodlet d93709366e Type annot the `testdir` fixture 2025-07-13 15:55:39 -04:00
Tyler Goodlet a5126862b9 Re-impl as `DebugStatus.maybe_enter_repl_fixture()`
Dropping the `_maybe_open_repl_fixture()` approach and instead using
a `DebugStatus._fixture_stack = ExitStack()` which provides for much
simpler support around both sync and async pausing APIs thanks to only
invoking `repl_fixture.__exit__()` on actual `PdbREPL` interaction being
complete!

Deats,
- all `repl_fixture` detection logic still happens in one place (the new
  method) but we aren't limited to closing it via an immediate post REPL
  `.__exit__()` call which instead is triggered by,
- `DebugStatus.release()` which now calls `._fixture_stack.close()` and
  thus only invokes `repl_fixture.__exit__()` when user REPL-ing is
  **actually complete** an arbitrary amount of debugging time later.
- include the notes for `@acm` support above the new method, though not
  sure if they're as relevant any more?

Benefits,
- we can drop the previously added indent levels from
  `_enter_repl_sync()` and `_post_mortem()`.
- now we automatically have support for the `.pause_from_sync()` API
  since `_enter_repl_sync()` doesn't close the prior
  `_maybe_open_repl_fixture()` immediately when `debug_func=None`; the
  user's `__exit__()` is only ever called once `.release()` is.

Other,
- add big 'CASE' comments around the various blocks in
  `.pause_from_sync()`, i was having trouble figuring out which i was
  using from a `breakpoint()` in a dependent app..
2025-07-13 15:55:39 -04:00
Tyler Goodlet ce2d06cad9 Always pass `repl: PdbREPL` as first param to fixture 2025-07-13 15:55:39 -04:00
Tyler Goodlet c4df4617ab Adjust restore-bp-ex import path to `.devx.debug`
Reversion of original cherry-pick fix from downstream history;
`.devx.debug` is now legit here.
2025-07-13 15:54:24 -04:00
Tyler Goodlet 9b1bfea937 Reorg `.devx.debug` into sub-mods!
Which cleans out the pkg-mod to just the expected exports with (its
longstanding todo comment list) and thus a separation-of-concerns
and smaller mod-file sizes via the following new sub-mods:
- `._trace` for the `.pause()`/`breakpoint()`/`pdb.set_trace()`-style
  APIs including all sync-caller variants.
- `._post_mortem` to contain our async `.post_mortem()` and all other
  public crash handling APIs for use from sync callers.
- `._sync` for the high-level syncing helper-routines used throughout the
  runtime to avoid multi-proc TTY use collisions.

And also,
- remove `hide_runtime_frames()` since moved to `.devx._frame_stack`.
2025-07-13 15:53:35 -04:00
Tyler Goodlet 69d86fca7f Mv `.hide_runtime_frames()` -> `.devx._frame_stack`
A much more relevant module for a call-stack-frame hider ;)
2025-07-13 15:53:35 -04:00
Tyler Goodlet 32b87abdea Cherry-pick conflict resolution
Orig commit was,
"9c0de24 Be explicit with `SpawnSpec` processing in subs"

The commit was picked onto an upstream branch but at that time there was
no `.devx.debug` subpkg yet, hence this revert to the original patch's
module path.
2025-07-13 15:53:35 -04:00
Tyler Goodlet 7c596d34ad Enable new `tractor.devx.debug._tty_lock` in the root 2025-07-13 15:53:35 -04:00
Tyler Goodlet f6513bb2cf Start splitting into `devx.debug.` sub-mods
From what was originall the `.devx._debug` monolith module, since that
file was way out of ctl in terms of LoC!

New modules so far include,
- ._repl: our `pdb[p]` ext type/lowlevel-APIs and `mk_pdb()` factory.
- ._sigint: just our REPL-interaction shield-handler.
- ._tty_lock: containing all the root-actor TTY mutex machinery
  including the `Lock`/`DebugStatus` primitives/APIs as well as the
  inter-tree IPC context eps:
  * the server-side `lock_stdio_for_peer()` which pairs with the,
  * client-(subactor)-side `request_root_stdio_lock()` via the,
  * pld-msg-spec of `LockStatus/LockRelease`.
  AND the `any_connected_locker_child()` predicate.
2025-07-13 15:53:35 -04:00
Tyler Goodlet 4f6a9c62c6 Add `_maybe_open_repl_fixture()`
Factoring the (basically duplicate) content from both use spots into
a common `@cm` which delivers a `bool` signalling whether the REPL
should be engaged. Fixes a lingering bug with `nullcontext()` calling
btw..
2025-07-13 15:53:35 -04:00
Tyler Goodlet ba97a8ddc5 Mk `.devx._debug` a sub-pkg `.devx.debug`
With plans for much factoring of the original module into sub-mods!
Adjust all imports and refs throughout to match.
2025-07-13 15:53:35 -04:00
Tyler Goodlet b9b89b447c Add exc suppression to `open_crash_handler()`
By supporting a new optional param to `open_crash_handler()`,
`raise_on_exit: bool|Sequence[Type[BaseException]] = True` which
determines whether, after the REPL interaction completes, the handled
exception is raised upward. This is **very** handy for writing bits of
"debug-able but resilient code" as is the case in (many) dependent
projects/apps.

Impl,
- `raise_on_exit` can be a `bool` or (set) sequence of types which will
  always be raised.
- also add a `BoxedMaybeException.raise_on_exit` equiv which (for now)
  we check matches (in case down the road we want to offer dynamic ctls).
- rename both crash-handler cm's `tb_hide` -> `hide_tb`.
2025-07-13 15:53:35 -04:00
Tyler Goodlet 5e102ec368 Add initial `repl_fixture` support B)
It turns out to be fairly useful to allow hooking into a given actor's
entry-and-exit around `.devx._debug._pause/._post_mortem()` calls which
engage the `pdbp.Pdb` REPL (really our `._debug.PdbREPL` but yeah).

Some very handy use cases include,
- swapping out-of-band (config) state that may otherwise halt the
  user's app since the actor normally handles kb&mouse input, in thread,
  which means that the handler will be blocked while the REPL is in use.
- (remotely) reporting actor-runtime state for monitoring purposes
  around crashes or pauses in normal operation.
- allowing for crash-handling to be hard-disabled via
  `._state._runtime_vars` say for when you never want a debugger to be
  entered in a production instance where you're not-sure-if/don't-want
  per-actor `debug_mode: bool` settings to always be unset, say bc
  you're still debugging some edge cases that ow you'd normally want to
  REPL up.

Impl details,
- add a new optional `._state._runtime_vars['repl_fixture']` field which
  for now can be manually set; i saw no reason for a formal API yet
  since we want to convert the `dict` to a struct anyway (first).
- augment both `.devx._debug._pause()/._post_mortem()` with a new
  optional `repl_fixture: AbstractContextManager[bool]` kwarg which
  when provided is `with repl_fixture()` opened around the lowlevel
  REPL interaction calls; if the enter-result, an expected `bool`, is
  `False` then the interaction is hard-bypassed.
  * for the `._pause()` case the `@cm` is opened around the entire body
    of the embedded `_enter_repl_sync()` closure (for now) though
    ideally longer term this entire routine is factored to be a lot less
    "nested" Bp
  * in `_post_mortem()` the entire previous body is wrapped similarly
    and also now excepts an optional `boxed_maybe_exc: BoxedMaybeException`
    only passed in the `open_crash_handler()` caller case.
- when the new runtime-var is overridden, (only manually atm) it is used
  instead but only whenever the above `repl_fixture` kwarg is left null.
- add a `BoxedMaybeException.pformat() = __repr__()` which when
  a `.value: Exception` is set renders a more "objecty" repr of the exc.

Obviously tests for all this should be coming soon!
2025-07-13 15:53:35 -04:00
Tyler Goodlet c93a7d9b24 Add a `debug_mode`-state reversion test 2025-07-13 15:53:35 -04:00
Tyler Goodlet faa681da21 Fix ref-err on `logger` input to `get_console_log()`
Particularly on a get-attr of `StackLevelAdapter.handlers` which, when
a `logger: StackLevelAdapter` is passed, we need to *not call* our own
`get_logger()` and just set is as the `log`. Fix the typing to match.
2025-07-13 15:53:35 -04:00
Tyler Goodlet 69984c44ef Unset debug-mode on root actor exit
Discovered this bug while testing `modden`'s daemon under various
cancelled-while-booting race conditions where sequential tests would
fail a lingering `assert 0` inside `.to_asyncio.run_as_asyncio_guest()`
to (oddly) catch redundant greenback-re-inits..

XD

Needs a test likely ;P
2025-07-13 15:53:35 -04:00
Tyler Goodlet e2f24b189b Expose `.trionics.maybe_collapse_eg` 2025-07-13 15:53:35 -04:00
Tyler Goodlet 166a252d4e Use `.is_debug_mode()` for maybe-crash-handling
Such that the default is `None` and in the case where the caller *does
not* set the `pdb` arg to an explicit `bool` we instead determine it via
the output from `._state.is_debug_mode()` allowing for more "nonchalant"
usage throughout a (test) code base which passes the `debug_mode: bool`
as runtime config; allows delegation to the per-actor proc-global state.
2025-07-13 15:53:35 -04:00
Tyler Goodlet 96a4d381df Add todo for `dulwhich` as dep 2025-07-13 15:53:35 -04:00
Tyler Goodlet 7c09972c7f Formally add `nest_from_op()` XD
Moving it from where i (oddly) first wrote it up in `._entry` to a more
proper place with its pals in `.devx.pformat` ;p
2025-07-13 15:53:35 -04:00
Tyler Goodlet 1eb0d785a8 Try out separate readme section for infra badges
Bc why clutter the intro like everyone else.. instead put them just
above the install section.
2025-07-13 15:48:26 -04:00
Tyler Goodlet 98d0ca88e5 Flip a couple more debug scripts to UDS tpt
For now just as sanity that we're not breaking anything on that
transport backend (since just a little while back there were issues with
crash handling in subs..) when it comes to crash-REPLing.
2025-07-13 15:26:37 -04:00
Tyler Goodlet 37f843a128 Add an `enable_transports` test-suite
Like it sounds, verifying that when that param is passed to the runtime
startup eps (`.open_root_actor()/.open_nursery()`), the appropriate
tpt-protocol is deployed for IPC (both the server and bound endpoints)
in both the root and any sub-actors (as passed down from rent to child
via the `.msg.types.SpawnSpec`).
2025-07-13 15:26:37 -04:00
Tyler Goodlet 29cd2ddbac Drop 'IPC' prefix from `._server` types
We already have the `.ipc` sub-pkg name so it seems a bit
redundant/noisy for a namespace path Bp

Leave an alias for the `Server` rn since it's already used in a few
other internal mods.. will likely rename later if everyone is cool with
it..
2025-07-13 15:26:37 -04:00
Tyler Goodlet 295b06511b Plugin-ize some re-usable `conftest` parts
Namely any CLI driven runtime-config fixtures such as,

- `--spawn-backend` and `start_method`,
- `--tpdb` and `debug_mode`,
- `--tpt-proto` and `tpt_protos`/`tpt_proto`,
- `reg_addr` as driven by the above.

This moves all fixtures and necessary hook funcs (CLI parsing,
configuring and test-gen) to the `._testing.pytest` module and thus
allows any dependent project to leverage these fixtures in their own
test suites after pointing to that plugin mod using,

```python
    # conftest.py
    pytest_plugins: tuple[str] = (
        "tractor._testing.pytest",
    )
```

Also, add a new `._testing.addr` helper mod which now contains
a factored `get_rando_addr()` helper for creating test-sesh unique
tpt-specific registry (or other) IPC endpoint addrs.
2025-07-13 15:26:37 -04:00
Tyler Goodlet 1e6b5b3f0a Start a very basic ipc-server unit test suite
For now it just boots a server, parametrized over all tpt-protos, sin
any actor runtime bootup. Obvi the future todo is ensuring it all works
with a client connecting via the equivalent lowlevel
`.ipc._chan._connect_chan()` API(s).
2025-07-13 15:26:37 -04:00
Tyler Goodlet 36ddb85197 Fix assert on `.devx.maybe_open_crash_handler()` delivered `bxerr` 2025-07-13 15:26:37 -04:00
Tyler Goodlet d6b0ddecd7 Improve bit of tooling for `test_resource_cache.py`
Namely while what I was actually trying to solve was why
`TransportClosed` was getting raised from `Portal.cancel_actor()` but
still useful edge case auditing either way. Also opts into the
`debug_mode` fixture with apprope timeout adjustment B)
2025-07-13 15:26:37 -04:00
Tyler Goodlet 9e5475391c Set `_state._def_tpt_proto` in `tpt_proto` fixture
Such that the global test-session always (and only) runs against the CLI
specified `--tpt-proto=` transport protocol.
2025-07-13 15:26:37 -04:00
Tyler Goodlet ef7ed7ac6f Handle unconsidered fault-edge cases for UDS
In `tests/test_advanced_faults.py` that is.
Since instead of zero-responses like we'd expect from a network-socket
we actually can get a few differences from the OS when "everything IPC
is known"

XD

Namely it's about underlying `trio` exceptions versus how we wrap them
and how we expect to box them. A `TransportClosed` boxing improvement
is coming in follow up btw to make this all work!

B)
2025-07-13 15:26:37 -04:00
Tyler Goodlet d8094f4420 Woops, ensure we use `global` before setting `daemon()` fixture spawn delay.. 2025-07-13 15:26:37 -04:00
Tyler Goodlet d7b12735a8 Support multiple IPC transports in test harness!
Via a new accumulative `--tpt-proto` arg you can select which
`tpt_protos: list[str]`-fixture protocol keys will be delivered to
opting in tests!

B)

Also includes,
- CLI quote handling/stripping.
- default of 'tcp'.
- only support one selection per session at the moment (until we figure
  out how we want to support multiples, either simultaneously or
  sequentially).
- draft a (masked) dynamic-`metafunc` parametrization in the
  `pytest_generate_tests()` hook.
- first proven and working use in the `test_advanced_faults`-suite (and
  thus its underlying
  `examples/advanced_faults/ipc_failure_during_stream.py` script)!
 |_ actually needed this to prove that the suite only has 2 failures on
    'uds' seemingly due to low-level `trio` error semantics translation
    differences to do with with calling `socket.close()`..

On a very nearly related topic,
- draft an (also commented out) `set_script_runtime_args()` fixture idea
  for a std way of `partial`-ling in runtime args to `examples/`
  scripts-as-modules defining a `main()` which would proxy to
  `tractor.open_nursery()`.
2025-07-13 15:26:37 -04:00
Tyler Goodlet 47107e44ed Start protoyping multi-transport testing
Such that we can run (opting-in) tests on both TCP and UDS backends and
ensure the `reg_addr` fixture and various timeouts are adjusted
accordingly.

Impl deats,
- add a new `tpc_proto` CLI option and fixture to allow choosing which
  "transport protocol" will be used in the test suites (either globally
  or contextually).
- rm `_reg_addr` instead opting for a `_rando_port` which will only be
  used for `reg_addr`s which are net-tpt-protos.
- rejig `reg_addr` fixture to set a ideally session-unique `testrun_reg_addr`
  based on the `tpt_proto` setting making appropriate calls to `._addr`
  APIs as needed.
- refine `daemon` fixture a bit with typing, `tpt_proto` timings, and
  stderr capture.
- in `test_discovery` do a ton of type-annots, add `debug_mode` fixture
  opt ins, augment `spawn_and_check_registry()` with `psutil.Process`
  passing for introspection (when things go wrong..).
2025-07-13 15:26:37 -04:00
Bd ba384ca83d
Merge pull request #375 from goodboy/structural_dynamics_of_flow
The Structural Dynamics of Flow: finally a formalized modular transport layer -> `tractor.ipc` 😎
2025-07-13 15:11:00 -04:00
Tyler Goodlet ad9833a73a Update actions badge links in readme 2025-07-13 14:56:57 -04:00
Tyler Goodlet 161884fbf1 Adjust back `.devx._debug` import
Bc this history is pre `.devx.debug` subpkg creation..
2025-07-13 13:56:37 -04:00
Tyler Goodlet c2e7dc7407 Avoid silent `stackscope`-test fail due to dep
Oddly my env was borked bc a missing sub-dep (`typing-extensions`
apparently not added by `uv` for `stackscope`?) and then `stackscope`
was silently failing import and caused the shield-pause test to also
fail (since it couldn't match the expected `log.devx()` on console). The
import failure is not very explanatory due to the `log.warning()`;
change it to `.error()` level.

Also, explicitly import `_sync_pause_from_builtin` in
`examples/debugging/restore_builtin_breakpoint.py` to ensure the ref is
exported properly from `.devx.debug` (which it wasn't during dev of the
prior commit Bp).
2025-07-13 13:45:15 -04:00
Tyler Goodlet 309360daa2 Add latest `typing-extension`, needed by `stackscope` 2025-07-13 13:43:48 -04:00
Tyler Goodlet cbfb0d0144 Don't use `uv sync --locked` for now 2025-07-13 13:26:22 -04:00
Tyler Goodlet c0eef3bac3 Bump GH CI to use `uv` throughout!
See docs: https://docs.astral.sh/uv/guides/integration/github/

Summary,
- drop `mypy` job for now since I'd like to move to trying `ty`.
- convert sdist built to `uv build`
- just run test suite on py3.13 for now, not sure if 3.12 will break due
  to the eg stuff or not?
2025-07-13 13:23:37 -04:00
Tyler Goodlet 27e6ad18ee Mk `.ipc._tcp.TCPAddress` validate with `ipaddress`
Both via a post-init method to validate the original input `._host: str`
and in `.is_valid` to ensure the host-part isn't something, esoteric..
2025-07-10 18:12:50 -04:00
Tyler Goodlet 28e32b8f85 Use `enable_transports: list[str]` parameter
Actually applying the input it in the root as well as all sub-actors by
passing it down to sub-actors through runtime-vars as delivered by the
initial `SpawnSpec` msg during child runtime init.

Impl deats,
- add a new `_state._runtime_vars['_enable_tpts']: list[str]` field set
  by the input param (if provided) to `.open_root_actor()`.
- mk `current_ipc_protos()` return the runtime-var entry with instead
  the default in the `_runtime_vars: dict` set to `[_def_tpt_proto]`.
- in `.open_root_actor()`, still error on this being a >1 `list[str]`
  until we have more testing infra/suites to audit multi-protos per
  actor.
- return the new value (as 3rd element) from `Actor._from_parent()` as
  per the todo note; means `_runtime.async_main()` will allocate
  `accept_addrs` as tpt-specific `Address` entries and pass them to
  `IPCServer.listen_on()`.

Also,
- also add a new `_state._runtime_vars['_root_addrs']: list = []` field
  with the intent of fully replacing the `'_root_mailbox'` field since,
  * it will need to be a collection to support multi-tpt,
  * it's a more cohesive field name alongside `_registry_addrs`,
  * the root actor of every tree needs to have a dedicated addr set
    (separate from any host-singleton registry actor) so that all its
    subs can contact it for capabilities mgmt including debugger
    access/locking.
- in the root, populate the field in `._runtime.async_main()` and for
  now just set '_root_mailbox' to the first entry in that list in
  anticipation of future multi-homing/transport support.
2025-07-10 18:12:28 -04:00
Tyler Goodlet 05df634d62 Use `Channel.aid: Aid` throughout `.ipc._server` 2025-07-10 17:48:13 -04:00
Tyler Goodlet 6d2f4d108d Detail the docs on `Context._maybe_raise_remote_err()` 2025-07-10 17:48:02 -04:00
Tyler Goodlet ae2687b381 Bump lock file for new 3.13 wheels/schema
Buncha either new AOTc lib whls and they added an `upload-time` field.
2025-07-10 17:47:53 -04:00
Tyler Goodlet a331f6dab3 Return `Path` from `_get_mod_abspath()` helper fn 2025-07-10 17:47:42 -04:00
Tyler Goodlet 9c0de24899 Be explicit with `SpawnSpec` processing in subs
As per the outstanding TODO just above the redic `setattr()` loop in
`Actor._from_parent()`!!

Instead of all that risk-ay monkeying, add detailed comment-sections
around each explicit assignment of each `SpawnSpec` field, including
those that were already being explicitly set.

Those and other deats,
- ONLY enable the `.devx._debug` (CHERRY-CONFLICT: later changed to
  `.debug._tty_lock`) module from `Actor.__init__()` in the root actor.
- ONLY enable the `.devx.debug._tty_lock` module from `Actor.__init__()`
  in the root actor.
- add a new `get_mod_nsps2fps()` to replace the loop in init and assign
  the initial `.enable_modules: dict[str, str]` from it.
- do `self.enable_modules.update(spawnspec.enable_modules)` instead of
  an overwrite and assert the table is by default empty in all
  subs.
2025-07-10 17:42:36 -04:00
Tyler Goodlet 1f3cef5ed6 Fix now invalid `Actor._peers` ref.. 2025-07-09 23:09:41 -04:00
Tyler Goodlet 8538a9c591 Decouple actor-state from low-level ipc-server
As much as is possible given we currently do some graceful
cancellation join-waiting on any connected sub-actors whenever an active
`local_nursery: AcrtorNursery` in the post-rpc teardown sequence of
`handle_stream_from_peer()` is detected. In such cases we try to allow
the higher level inter-actor (task) context(s) to fully cancelled-ack
before conducting IPC machinery shutdown.

The main immediate motivation for all this is to support unit testing
the `.ipc._server` APIs but in the future may be useful for anyone
wanting to use our modular IPC transport layer sin-"actors".

Impl deats,
- drop passing an `actor: Actor` ref from as many routines in
  `.ipc._server` as possible instead opting to use
  `._state.current_actor()` where abs needed; thus the fns dropping an
  `actor` input param are:
  - `open_ipc_server()`
  - `IPCServer.listen_on()`
  - `._serve_ipc_eps()`
  - `.handle_stream_from_peer()`
- factor the above mentioned graceful remote-cancel-ack waiting into
  a new `maybe_wait_on_canced_subs()` which is called from
  `handle_stream_from_peer()` and delivers a
  maybe-`local_nursery: ActorNursery` for downstream logic; it's this
  new fn which primarily still needs to call `current_actor()`.
- in `handle_stream_from_peer()` also use `current_actor()` to check if
  a handshake is needed (or if it was called as part of some
  actor-runtime-less operation like our unit test suite!).
- also don't pass an `actor` to `._rpc.process_messages()` see how-n-why
  below..

Surrounding ipc-server client/caller adjustments,
- `._rpc.process_messages()` no longer takes an `actor` input and
  now calls `current_actor()` instead.
- `._portal.open_portal()` is adjusted to ^.
- `._runtime.async_main()` is adjusted to the `.ipc._server`'s removal
  of `actor` ref passing.

Also,
- drop some server `log.info()`s to `.runtime()`
2025-07-08 18:05:05 -04:00
Tyler Goodlet 7533e93b0f Log listener bind status for TCP as for UDS 2025-07-08 18:05:05 -04:00
Tyler Goodlet f67b0639b8 Move peer-tracking attrs from `Actor` -> `IPCServer`
Namely transferring the `Actor` peer-`Channel` tracking attrs,
- `._peers` which maps the uids to client channels (with duplicates
  apparently..)
- the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer
  syncing table mostly used by parent actors to wait on sub's to connect
  back during spawn.
- the `._no_more_peers = trio.Event()` level triggered state signal.

Further we move over with some minor reworks,
- `.wait_for_peer()` verbatim (adjusting all dependants).
- factor the no-more-peers shielded wait branch-block out of
  the end of `async_main()` into 2 new server meths,
  * `.has_peers()` with optional chan-connected checking flag.
  * `.wait_for_no_more_peers()` which *just* does the
    maybe-shielded `._no_more_peers.wait()`
2025-07-08 18:05:05 -04:00
Tyler Goodlet 26fedec6a1 Mv `Actor._stream_handler()` to `.ipc._server` func
Call it `handle_stream_from_peer()` and bind in the `actor: Actor` via
a `handler=partial()` to `trio.serve_listeners()`.

With this (minus the `Actor._peers/._peer_connected/._no_more_peers`
attrs ofc) we get nearly full separation of IPC-connection-processing
(concerns) from `Actor` state. Thus it's a first look at modularizing
the low-level runtime into isolated subsystems which will hopefully
improve the entire code base's grok-ability and ease any new feature
design discussions especially pertaining to introducing and/or
composing-together any new transport protocols.
2025-07-08 18:05:05 -04:00
Tyler Goodlet 0711576678 Passthrough `_pause()` kwargs from `_maybe_enter_pm()` 2025-07-08 18:05:05 -04:00
Tyler Goodlet 0477a62ac3 Never hide non-[msgtype/tpt-closed] error tbs in `Channel.send()` 2025-07-08 18:05:05 -04:00
Tyler Goodlet 01d6f111f6 Use `current_ipc_protos()` as the `enable_transports`-default-when-`None`
Also ensure we assertion-error whenever the list is > 1 entry for now!
2025-07-08 18:05:05 -04:00
Tyler Goodlet 56ef4cba23 Add `_state.current_ipc_protos()`
For now just wrapping wtv the `._def_tpt_proto` per-actor setting is.
2025-07-08 18:05:05 -04:00
Tyler Goodlet 52b5efd78d Another `tn` eg-loosify inside `ActorNursery.cancel()`.. 2025-07-08 18:05:05 -04:00
Tyler Goodlet a7d4bcdfb9 Absorb `TransportClosed` in `Portal.cancel_actor()`
Just like we *were* for the `trio`-resource-errors it normally wraps
since we now also do the same wrapping in `MsgpackTransport.send()`
and we don't normally care to raise tpt-closure-errors on graceful actor
cancel requests.

Also, warn-report any non-tpt-closed low-level `trio` errors we haven't
yet re-wrapped (likely bc they haven't shown up).
2025-07-08 18:05:05 -04:00
Tyler Goodlet 79d0c17f6b Add `TransportClosed.from_src_exc()`
Such that re-wrapping/raising from a low-level `trio` resource error is
simpler and includes the `.src_exc` in the `__repr__()` and
`.message/.args` rendered at higher layers (like from `Channel` and
`._rpc` machinery).

Impl deats,
- mainly leverages packing in a new cls-method `.repr_src_exc() -> str:`
  repr of the underlying error before an optional `body: str` all as
  handled by the previously augmented `.pformat()`'s delegation to
  `pformat_exc()`.
- change `.src_exc` to be a property around a renamed `._src_exc`.

But wait, why?
- use it inside `MsgpackTransport.send()` to rewrap any
  `trio.BrokenResourceError`s so we always see the underlying
  `trio`-src-exc just like in the `.recv()._iter_packets()` handlers.
2025-07-08 18:05:05 -04:00
Tyler Goodlet 98c4614a36 Factor actor-embedded IPC-tpt-server to `ipc` subsys
Primarily moving the `Actor._serve_forever()`-task-as-method and
supporting actor-instance attributes to a new `.ipo._server` sub-mod
which now encapsulates,
- the coupling various `trio.Nursery`s (and their independent lifetime mgmt)
  to different `trio.serve_listener()`s tasks and `SocketStream`
  handler scopes.
- `Address` and `SocketListener` mgmt and tracking through the idea of
  an "IPC endpoint": each "bound-and-active instance" of a served-listener
  for some (varied transport protocol's socket) address.
- start and shutdown of the entire server's lifetime via an `@acm`.
- delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s
  to the corresponding `.ipc._<proto_key>` sub-module (newly defined
  mod-top-level instead of `Address` method) `start/close_listener()`
  funcs.

Impl details of the `.ipc._server` sub-sys,
- add new `IPCServer`, allocated with `open_ipc_server()`, and which
  encapsulates starting multiple-transport-proto-`trio.abc.Listener`s
  from an input set of `._addr.Address`s using,
  |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new
    `_serve_ipc_eps()`, a rework of what was (effectively)
    `Actor._serve_forever()` and which now,
    * allocates a new `IPCEndpoint`-struct (see below) for each
      address-listener pair alongside the specified
      listener-serving/stream-handling `trio.Nursery`s provided by the
      caller.
    * starts and stops each transport (socket's) listener by calling
      `IPCEndpoint.start/close_listener()` which in turn delegates to
      the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt
      module's equivalent impl.
    * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]`
      which is further exposed through public properties for
      introspection of served transport-protocols and their addresses.
  |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either
     allocated (in which case, as the same instance) or provided by the
     caller of `open_ipc_server()` such that the same nursery-cancel-scope
     controls offered by `trio.serve_listeners(handler_nursery=)` are
     offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()`
     tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`.
- a new `IPCEndpoint`-struct (as mentioned) which wraps each
  transport-proto's address + listener + allocated-supervising-nursery
  to encapsulate the "lifetime of a server IPC endpoint" such that
  eventually we can track and managed per-protocol/address/`.listen_on()`-call
  scoped starts/stops/restarts for the purposes of filtering/banning
  peer traffic.
  |_ also included is an unused `.peer_tpts` table which we can
    hopefully use to replace `Actor._peers` in a `Channel`-tracking
    transport-proto-aware way!

Surrounding changes to `.ipc.*` primitives to match,
- make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus
  drop any-and-all `addr._host =` style mutation throughout.
  |_ as such also drop their `.__init__()` and `.__eq__()` meths.
  |_ UDS tweaks to field names and thus `.__repr__()`.
- move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level
  equiv `start|close_listener()` funcs.
- just hard code the `.ipc._types._key_to_transport/._addr_to_transport`
  table entries instead of all the prior fancy dynamic class property
  reading stuff (remember, "explicit is better then implicit").

Modified in `._runtime.Actor` internals,
- drop the `._serve_forever()` and `.cancel_server()`, methods and
  `._server_down` waiting logic from `.cancel_soon()`
- add `.[_]ipc_server` which is opened just after the `._service_n` and
  delegate to it for any equivalent publicly exposed instance
  attributes/properties.
2025-07-08 18:05:05 -04:00
Tyler Goodlet 61df10b333 Move concrete `Address`es to each tpt module
That is moving from `._addr`,
- `TCPAddress` to `.ipc._tcp`
- `UDSAddress` to `.ipc._uds`

Obviously this requires adjusting a buncha stuff in `._addr` to avoid
import cycles (the original reason the module was not also included in
the new `.ipc` subpkg) including,

- avoiding "unnecessary" imports of `[Unwrapped]Address` in various modules.
  * since `Address` is a protocol and the main point is that it **does
    not need to be inherited** per
    (https://typing.python.org/en/latest/spec/protocol.html#terminology)
    thus I removed the need for it in both transport submods.
  * and `UnwrappedAddress` is a type alias for tuples.. so we don't
    really always need to be importing it since it also kinda obfuscates
    what the underlying pairs are.
- not exporting everything in submods at the `.ipc` top level and
  importing from specific submods by default.
- only importing various types under a `if typing.TYPE_CHECKING:` guard
  as needed.
2025-07-08 18:05:05 -04:00
Tyler Goodlet 094447787e Add API-modernize-todo on `experimental._pubsub.fan_out_to_ctxs` 2025-07-08 18:05:05 -04:00
Tyler Goodlet ba45c03e14 Skip the ringbuf test mod for now since data-gen is a bit "heavy/laggy" atm 2025-07-08 18:05:05 -04:00
Tyler Goodlet 00d8a2a099 Improve `TransportClosed.__repr__()`, add `src_exc`
By borrowing from the implementation of `RemoteActorError.pformat()`
which is now factored into a new `.devx.pformat_exc()` and re-used for
both error types while maintaining the same func-sig. Obviously delegate
`RemoteActorError.pformat()` to the new helper accordingly and keeping
the prior `body` generation from `.devx.pformat_boxed_tb()` as before.

The new helper allows for,
- passing any of a `header|message|body: str` which are all combined in
  that order in the final output.
- getting the `exc.message` as the default `message` part.
- generating an objecty-looking "type-name" header to be rendered by
  default when `header` is not overridden.
- "first-line-of `message`" processing which we split-off and then
  re-inject as a `f'<{type(exc).__name__}( {first} )>'` top line header.
- an optional `tail: str = '>'` to "close the object"-look only added
  when `with_type_header: bool = True`.

Adjustments to `TransportClosed` around this include,
- replacing the init `cause` arg for a `src_exc` which is now always
  assigned to a same named instance var.
- displaying that new `.src_exc` in the `body: str` arg to the
  `.devx.pformat.pformat_exc()` call so you can always see the
  underlying (normally `trio`) source error.
- just make it inherit from `Exception` not `trio.BrokenResourceError`
  to avoid handlers catching `TransportClosed` as the former
  particularly in testing when we want to sometimes to distinguish them.
2025-07-08 18:05:05 -04:00
Tyler Goodlet bedde076d9 Unwrap `UDSAddress` as `tuple[str, str]`, i.e. sin pid
Since in hindsight the real analog of a net-proto's "bindspace"
(normally its routing layer's addresses-port-set) is more akin to the
"location in the file-system" for a UDS socket file (aka the file's
parent directory) determines whether or not the "port" (aka it's
file-name) collides with any other.

So the `._filedir: Path` is like the allocated "address" and,
the `._filename: Path|str` is basically the "port",

at least in my mind.. Bp

Thinking about fs dirs like a "host address" means you can get
essentially the same benefits/behaviour of say an (ip)
addresses-port-space but using the (current process-namespace's)
filesys-tree. Note that for UDS sockets in particular the
network-namespace is what would normally isolate so called "abstract
sockets" (i.e. UDS sockets that do NOT use file-paths by setting `struct
sockaddr_un.sun_path = 'abstract', see `man unix`); using directories is
even easier and definitely more explicit/readable/immediately-obvious as
a human-user.

As such this reworks all the necessary `UDSAddress` meths,
- `.unwrap()` now returns a `tuple(str(._filedir, str(._filename))`,
- `wrap_address()` now matches UDS on a 2nd tuple `str()` element,
- `.get_root()` no longer passes `maybe_pid`.

AND adjusts `MsgpackUDSStream` to,
- use the new `unwrap_sockpath()` on the `socket.get[sock/peer]name()`
  output before passing directly as `UDSAddress.__init__(filedir, filename)`
  instead of via `.from_addr()`.
- also pass `maybe_pid`s to init since no longer included in the
  unwrapped-type form.
2025-07-08 18:05:05 -04:00
Tyler Goodlet be1d8bf6fa s/`._addr.preferred_transport`/`_state._def_tpt_proto`
Such that the "global-ish" setting (actor-local) is managed with the
others per actor-process and type it as a `Literal['tcp', 'uds']` of the
currently support protocol keys.

Here obvi `_tpt` is some kinda shorthand for "transport" and `_proto` is
for "protocol" Bp

Change imports and refs in all dependent modules.

Oh right, and disable UDS in `wrap_address()` for the moment while
i figure out how to avoid the unwrapped type collision..
2025-07-08 18:05:04 -04:00
Tyler Goodlet d9aee98db2 Add `Arbiter.is_registry()` in prep for proper `.discovery._registry` 2025-07-08 18:05:04 -04:00
Tyler Goodlet 708ce4a051 Repair weird spawn test, start `test_root_runtime`
There was a very strange legacy test
`test_spawning.test_local_arbiter_subactor_global_state` which was
causing unforseen hangs/errors on the UDS tpt and looking deeper this
test was already doing root-actor things that should never have been
valid XD

So rework that test to properly demonstrate something of value
(i guess..) and add a new suite which start more rigorously auditing our
`open_root_actor()` permitted usage.

For the old test,
- since the main point of this test seemed to be the ability to invoke
  the same function in both the parent and child actor (using the very
  legacy `ActorNursery.run_in_actor()`.. due to be deprecated) rename it
  to `test_run_in_actor_same_func_in_child`,
- don't re-enter `.open_root_actor()` since that's invalid usage (tested
  in new suite see below),
- adjust some `spawn()` arg/var naming and ensure we only return in the
  child.

For the new suite add tests for,
- ensuring the implicit `open_root_actor()` call under `open_nursery()`.
- double open of `open_root_actor()` from within the same process tree
  both from a root and sub.

Intro some new `_exceptions` used in the new suite,
- a top level `RuntimeFailure` for generically expressing faults not of
  our own doing that prevent successful operation; this is what we now
  (changed in this commit) raise on attempts to open a 2nd root.
- mk `ActorFailure` derive from the former; it's already used from
  `._spawn` when subprocs fail to boot.
2025-07-08 18:05:04 -04:00
Tyler Goodlet d6d0112d95 Some more log message tweaks
- aggregate the `MsgStream.aclose()` "reader tasks" stats content into a
  common `message: str` before emit.
- tweak an `_rpc.process_messages()` emit per new `Channel.__repr__()`.
2025-07-08 18:05:04 -04:00
Tyler Goodlet 0fcbedd2be Change some low-hanging `.uid`s to `.aid`
Throughout `_context` and `_spawn` where it causes no big disruption.
Still lots to work out for things like how to pass `--uid
<tuple-as-str>` to spawned subactors and whether we want a diff name for
the minimum `tuple` required to distinguish a subactor pre-process-ID
allocation by the OS.
2025-07-08 18:05:04 -04:00
Tyler Goodlet 412c66d000 Mv to `Channel._do_handshake()` in `open_portal()`
As per the method migration in the last commit. Also adjust all `.uid`
usage to the new `.aid`.
2025-07-08 18:05:04 -04:00
Tyler Goodlet 3cc835c215 Mv `Actor._do_handshake()` to `Channel`, add `.aid`
Finally.. i've been meaning todo this for ages since the
actor-id-swap-as-handshake is better layered as part of the IPC msg-ing
machinery and then let's us encapsulate the connection-time-assignment
of a remote peer's `Aid` as a new `Channel.aid: Aid`. For now we
continue to offer the `.uid: tuple[str, str]` attr (by delegating to the
`.uid` field) since there's still a few things relying on it in the
runtime and ctx layers

Nice bonuses from this,
- it's very easy to get the peer's `Aid.pid: int` from anywhere in an
  IPC ctx by just reading it from the chan.
- we aren't saving more then the wire struct-msg received.

Also add deprecation warnings around usage to get us moving on porting
the rest of consuming runtime code to the new attr!
2025-07-08 18:05:04 -04:00
Tyler Goodlet f15bbb30cc UDS: translate file dne to connection-error
For the case where there's clearly no socket file created/bound
obviously the `trio.socket.connect()` call will raise
`FileNotFoundError`, so just translate this to
a builtin-`ConnectionError` at the transport layer so we can report the
guilty `UDSAddress`.
2025-07-08 18:05:04 -04:00
Tyler Goodlet ad211f8c2c More `._addr` boxing refinements
The more I think about it, it seems @guille's orig approach of
unwrapping UDS socket-file addresses to strings (or `Path`) is making
the most sense. I had originally thought that pairing it with the
listening side's pid would add clarity (and it definitely does for
introspection/debug/logging) but since we don't end up passing that pid
to the eventual `.connect()` call on the client side, it doesn't make
much sense to wrap it for the wire just to discard.. Further, the
`tuple[str, int]` makes `wrap_address()` break for TCP since it will
always match on uds first.

So, on that note this patch refines a few things in prep for going back
to that original `UnwrappedAddress` as `str` type though longer run
i think the more "builtin approach" would be to add `msgspec` codec
hooks for these types to avoid all the `.wrap()`/`.unwrap()` calls
throughout the runtime.

Down-low deats,
- add `wrap_address()` doc string, detailed (todo) comments and handle
  the `[None, None]` case that can come directly from
  `._state._runtime_vars['_root_mailbox']`.
- buncha adjustments to `UDSAddress`,
  - add a `filedir`, chng `filepath` -> `filename` and mk `maybe_pid` optional.
  - the intent `filedir` is act as the equivalent of the host part in a network proto's
    socket address and when it's null use the `.def_bindspace = get_rt_dir()`.
  - always ensure the `filedir / filename` is an absolute path and
    expose it as a new `.sockpath: Path` property.
  - mk `.is_valid` actually verify the `.sockpath` is in the valid
    `.bindspace: namely just checking it's in the expected dir.
  - add pedantic `match:`ing to `.from_addr()` such that we error on
    unexpected `type(addr)` inputs and otherwise parse any `sockpath:
    Path` inputs using a new `unwrap_sockpath()` which simply splits an
    abs file path to dir, file-name parts.
  - `.unwrap()` now just `str`-ifies the `.sockpath: Path`
  - adjust `.open/close_listener()` to use `.sockpath`.
2025-07-08 18:05:04 -04:00
Tyler Goodlet acac605c37 Move `DebugRequestError` to `._exceptions` 2025-07-08 18:05:04 -04:00
Tyler Goodlet 078e507774 Add `psutil` to `--dev` / testing deps 2025-07-08 12:57:29 -04:00
Tyler Goodlet 81bf810fbb Factor `breakpoint()` blocking into `@acm`
Call it `maybe_block_bp()` can wrap the `open_root_actor()` body with
it. Main reason is to guarantee we can bp inside actor runtime bootup as
needed when debugging internals! Prolly should factor this to another
module tho?

ALSO, ensure we RTE on recurrent entries to `open_root_actor()` from
within an existing tree! There was actually `test_spawning` test somehow
getting away with this!? Should never be possible or allowed!
2025-07-08 12:57:29 -04:00
Tyler Goodlet 7d1512e03a Add an `Actor.pformat()`
And map `.__repr__/__str__` to it and add various new fields to fill it
out,
- drop `self.uid` as var and instead add `Actor._aid: Aid` and proxy to
  it for the various `.name/.uid/.pid` properties as well as a new
  `.aid` field.
 |_ the `Aid.pid` addition is also included.

Other improvements,
- flip to a sync call to `Address.close_listener()`.
- track the `async_main()` parent task as `Actor._task`.
- add exception logging around failure to bind due to already-in-use
  when calling `add.open_listener()` in `._stream_forever()`; sometimes
  the error might be overridden by something else during the
  runtime-failure unwind..
2025-07-08 12:57:29 -04:00
Tyler Goodlet 1c85338ff8 Add a `MsgpackTransport.pformat()`
And map `.__repr__/__str__` to it. Also adjust to new
`Address.proto_key` and add a #TODO for a `.get_peers()`.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 7a3c9d0458 Even more `tractor._addr.Address` simplifying
Namely reducing the duplication of class-fields and `TypeVar`s used
for parametrizing the `Address` protocol type,
- drop all of the `TypeVar` types and just stick with all concrete addrs
  types inheriting from `Address` only.
- rename `Address.name_key` -> `.proto_key`.
- rename `Address.address_type` -> `.unwrapped_type`
- rename `.namespace` -> `.bindspace` to better reflect that this "part"
  of the address represents the possible "space for binding endpoints".
 |_ also linux already uses "namespace" to mean the `netns` and i'd
   prefer to stick with their semantics for that.
- add `TCPAddress/UDSAddress.def_bindspace` values.
- drop commented `.open_stream()` method; never used.
- simplify `UnwrappedAdress` to just a `tuple` of union types.
- add logging to `USDAddress.open_listener()` for now.
- adjust `tractor.ipc/_uds/tcp` transport to use new addr field names.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 31196b9cb4 Handle broken-pipes from `MsgpackTransport.send()`
Much like we already do in the `._iter_packets()` async-generator which
delivers to `.recv()` and `async for`, handle the `''[Errno 32] Broken
pipe'` case that can show up with unix-domain-socket usage.

Seems like the cause is due to how fast the socket can be torn down
during a registry addr channel ping where,
- the sending side can break the connection faster then the pong side
  can prep its handshake msg,
- the pong side tries to send it's handshake pkt via
  `.SocketStream.send_all()` after the breakage and then raises
  `trio.BrokenResourceError`.
2025-07-08 12:57:28 -04:00
Tyler Goodlet 44c9da1c91 Emphasize internal error block header-comment a bit 2025-07-08 12:57:28 -04:00
Tyler Goodlet b4ce618e33 Bit of multi-line styling for `LocalPortal` 2025-07-08 12:57:28 -04:00
Tyler Goodlet a504d92536 Adjust `._child` instantiation of `Actor` to use newly named `uuid` arg 2025-07-08 12:57:28 -04:00
Tyler Goodlet 8c0d9614bc Add `bidict` pkg as dep since used in `._addr` for now 2025-07-08 12:57:28 -04:00
Tyler Goodlet a6fefcc2a8 Adjust lowlevel-tb hiding logic for `MsgStream`
Such that whenev the `self._ctx.chan._exc is trans_err` we suppress.
I.e. when the `Channel._exc: Exception|None` error **is the same as**
set by the `._rpc.process_messages()` loop (that is, set to the
underlying transport layer error), we suppress the lowlevel tb,
otherwise we deliver the full tb since likely something at the lowlevel
that we aren't detecting changed/signalled/is-relevant!
2025-07-08 12:57:28 -04:00
Tyler Goodlet abdaf7bf1f Slight typing and multi-line styling tweaks in `.ipc` sugpkg 2025-07-08 12:57:28 -04:00
Tyler Goodlet 7b3324b240 Add a big boi `Channel.pformat()/__repr__()`
Much like how `Context` has been implemented, try to give tons of high
level details on all the lower level encapsulated primitives, namely the
`.msgstream/.transport` and any useful runtime state.

B)

Impl deats,
- adjust `.from_addr()` to only call `._addr.wrap_address()` when we
  detect `addr` is unwrapped.
- add another `log.runtime()` using the new `.__repr__()` in
  `Channel.from_addr()`.
- change to `UnwrappedAddress` as in prior commits.
2025-07-08 12:57:28 -04:00
Tyler Goodlet bbae2c91fd Allocate bind-addrs in subactors
Previously whenever an `ActorNursery.start_actor()` call did not receive
a `bind_addrs` arg we would allocate the default `(localhost, 0)` pairs
in the parent, for UDS this obviously won't work nor is it ideal bc it's
nicer to have the actor to be a socket server (who calls
`Address.open_listener()`) define the socket-file-name containing their
unique ID info such as pid, actor-uuid etc.

As such this moves "random" generation of server addresses to the
child-side of a subactor's spawn-sequence when it's sin-`bind_addrs`;
i.e. we do the allocation of the `Address.get_random()` addrs inside
`._runtime.async_main()` instead of `Portal.start_actor()` and **only
when** `accept_addrs`/`bind_addrs` was **not provided by the spawning
parent**.

Further this patch get's way more rigorous about the `SpawnSpec`
processing in the child inside `Actor._from_parent()` such that we
handle any invalid msgs **very loudly and pedantically!**

Impl deats,
- do the "random addr generation" in an explicit `for` loop (instead of
  prior comprehension) to allow for more detailed typing of the layered
  calls to the new `._addr` mod.
- use a `match:/case:` for process any invalid `SpawnSpec` payload case
  where we can instead receive a `MsgTypeError` from the `chan.recv()`
  call in `Actor._from_parent()` to raise it immediately instead of
  triggering downstream type-errors XD
  |_ as per the big `#TODO` we prolly want to take from other callers
     of `Channel.recv()` (like in the `._rpc.process_messages()` loop).
  |_ always raise `InternalError` on non-match/fall-through case!
  |_ add a note about not being able to use `breakpoint()` in this
     section due to causality of `SpawnSpec._runtime_vars` not having
     been processed yet..
  |_ always return a third element from `._from_rent()` eventually to be
     the `preferred_transports: list[str]` from the spawning rent.
- use new `._addr.mk_uuid()` and pass to new `Actor.__init__(uuid: str)`
  for all actor creation (including in all the mods tweaked here).
- Move to new type-alias-name `UnwrappedAddress` throughout.
2025-07-08 12:57:28 -04:00
Tyler Goodlet 2540d1f9e0 Adjust imports to use new `UnwrappedAddress`
For those mods where it's just a type-alias (name) import change.
2025-07-08 12:57:28 -04:00
Tyler Goodlet 63fac5a809 Implement peer-info tracking for UDS streams
Such that any UDS socket pair is represented (and with the recent
updates to) a `USDAddress` via a similar pair-`tuple[str, int]` as TCP
sockets, a pair of the `.filepath: Path` & the peer proc's `.pid: int`
which we read from the underlying `socket.socket` using
`.set/getsockopt()` calls

Impl deats,
- using the Linux specific APIs, we add a `get_peer_info()` which reads
  the `(pid, uid, gid)` using the `SOL_SOCKET` and `SOL_PEECRED` opts to
  `sock.getsockopt()`.
  |_ this presumes the client has been correspondingly configured to
     deliver the creds via a `sock.setsockopt(SOL_SOCKET, SO_PASSCRED,
     1)` call - this required us to override `trio.open_unix_socket()`.
- override `trio.open_unix_socket()` as per the above bullet to ensure
  connecting peers always transmit "credentials" options info to the
  listener.
- update `.get_stream_addrs()` to always call `get_peer_info()` and
  extract the peer's pid for the `raddr` and use `os.getpid()` for
  `laddr` (obvi).
  |_ as part of the new impl also `log.info()` the creds-info deats and
    socket-file path.
  |_ handle the oddity where it depends which of `.getpeername()` or
    `.getsockname()` will return the file-path; i think it's to do with
    who is client vs. server?

Related refinements,
- set `.layer_key: int = 4` for the "transport layer" ;)
- tweak some typing and multi-line unpacking in `.ipc/_tcp`.
2025-07-08 12:57:28 -04:00
Tyler Goodlet 568fb18d01 Rework/simplify transport addressing
A few things that can fundamentally change,

- UDS addresses now always encapsulate the local and remote pid such
  that it denotes each side's process much like a TCP *port*.
  |_ `.__init__()` takes a new `maybe_pid: int`.
  |_ this required changes to the `.ipc._uds` backend which will come in
     an subsequent commit!
  |_ `UDSAddress.address_type` becomes a `tuple[str, int]` just like the
      TCP case.
  |_ adjust `wrap_address()` to match.
- use a new `_state.get_rt_dir() -> Path` as the default location for
  UDS socket file: now under `XDG_RUNTIME_DIR'/tractor/` subdir by
  default.
- re-implement `USDAddress.get_random()` to use both the local
  `Actor.uid` (if available) and at least the pid for its socket file
  name.

Removals,
- drop the loop generated `_default_addrs`, simplify to just
  `_default_lo_addrs` for per-transport default registry addresses.
  |_ change to `_address_types: dict[str, Type[Address]]` instead of
     separate types `list`.
  |_ adjust `is_wrapped_addr()` to just check `in _addr_types.values()`.
- comment out `Address.open_stream()` it's unused and i think the wrong
  place for this API.

Renames,
- from `AddressTypes` -> `UnwrappedAddress`, since it's a simple type
  union and all this type set is, is the simple python data-structures
  we encode to for the wire.
  |_ see note about possibly implementing the `.[un]wrap()` stuff as
     `msgspec` codec `enc/dec_hook()`s instead!

Additions,
- add a `mk_uuid()` to be used throughout the runtime including for
  generating the `Aid.uuid` part.
- tons of notes around follow up refinements!
2025-07-08 12:57:28 -04:00
Guillermo Rodriguez f67e19a852 Trying to make full suite pass with uds 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 0be9f5f907 Finally switch to using address protocol in all runtime 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 5e2d456029 Add root and random addr getters on MsgTransport type 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez c7d5b021db Starting to make `.ipc.Channel` work with multiple MsgTransports 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 6f1f198fb1 Break out transport protocol and tcp specifics into their own submodules under tractor.ipc 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 26fef82d33 Add buf_size to RBToken and add sender cancel test, move disable_mantracker to its own _mp_bs module 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 84d25b5727 Make ring buf api use pickle-able RBToken 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 1ed0c861b5 Address some of fomo\'s comments 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 2dd3a682c8 Handle cancelation on EventFD.read 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 881813e61e Add module headers and fix spacing on tractor._ipc._linux 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 566a11c00d Move RingBuffSender|Receiver to its own tractor.ipc._ringbuf module 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez af69272d16 Move linux specifics from tractor.ipc._shm into tractor.ipc._linux 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 8e3f581d3f Move tractor._shm to tractor.ipc._shm 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez eceb292415 move tractor._ipc.py into tractor.ipc._chan.py 2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 9921ea3cae General improvements
EventFD class now expects the fd to already be init with open_eventfd
RingBuff Sender and Receiver fully manage SharedMemory and EventFD lifecycles, no aditional ctx mngrs needed
Separate ring buf tests into its own test bed
Add parametrization to test and cancellation
Add docstrings
Add simple testing data gen module .samples
2025-07-08 12:57:28 -04:00
Guillermo Rodriguez 414a8c5b75 IPC ring bug impl with async read 2025-07-08 12:57:28 -04:00
Tyler Goodlet eeb0516017 Merge branch 'gitea/main' into 'github/main'
Synchronizing the "main", previously (and less woke-ly/succinctly)
called "master", branch between our `gitea` remote and the current
`github` tip.

* main:
  Only set shield flag when trio nursery mode is used
  Disable parent channel append on get_peer_by_name to_scan
2025-06-19 19:51:03 -04:00
goodboy d6eeddef4e
Merge pull request #338 from goodboy/shm_apis
Shared memory array API and optional tight integration with `numpy`

Landing this so many downstream major feature branches depend on it namely,
- https://github.com/goodboy/tractor/pull/375
- https://github.com/goodboy/tractor/pull/376
- and the eventual #10
2025-04-25 23:20:46 -04:00
guille d478dbfcfe Merge pull request 'Fix to trionics helper `maybe_open_nursery`' (#26) from maybe_open_nursery_fix into main
Reviewed-on: #26
2025-04-13 20:58:47 +00:00
Guillermo Rodriguez ef6094a650
Only set shield flag when trio nursery mode is used 2025-04-13 17:57:37 -03:00
guille 4e8404bb09 Merge pull request 'Duplicated channel on `Actor._peers` causes hang on `portal.cancel_actor()`' (#25) from discovery_dedup into main
Reviewed-on: #25
2025-04-13 20:53:23 +00:00
Guillermo Rodriguez bbb3484ae9
Disable parent channel append on get_peer_by_name to_scan 2025-04-13 14:06:03 -03:00
17 changed files with 336 additions and 127 deletions

View File

@ -8,46 +8,70 @@ on:
workflow_dispatch: workflow_dispatch:
jobs: jobs:
# ------ sdist ------
mypy:
name: 'MyPy'
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt
- name: Run MyPy check
run: mypy tractor/ --ignore-missing-imports --show-traceback
# test that we can generate a software distribution and install it # test that we can generate a software distribution and install it
# thus avoid missing file issues after packaging. # thus avoid missing file issues after packaging.
#
# -[x] produce sdist with uv
# ------ - ------
sdist-linux: sdist-linux:
name: 'sdist' name: 'sdist'
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v4
- name: Setup python - name: Install latest uv
uses: actions/setup-python@v2 uses: astral-sh/setup-uv@v6
with:
python-version: '3.11'
- name: Build sdist - name: Build sdist as tar.gz
run: python setup.py sdist --formats=zip run: uv build --sdist --python=3.13
- name: Install sdist from .zips - name: Install sdist from .tar.gz
run: python -m pip install dist/*.zip run: python -m pip install dist/*.tar.gz
# ------ type-check ------
# mypy:
# name: 'MyPy'
# runs-on: ubuntu-latest
# steps:
# - name: Checkout
# uses: actions/checkout@v4
# - name: Install latest uv
# uses: astral-sh/setup-uv@v6
# # faster due to server caching?
# # https://docs.astral.sh/uv/guides/integration/github/#setting-up-python
# - name: "Set up Python"
# uses: actions/setup-python@v6
# with:
# python-version-file: "pyproject.toml"
# # w uv
# # - name: Set up Python
# # run: uv python install
# - name: Setup uv venv
# run: uv venv .venv --python=3.13
# - name: Install
# run: uv sync --dev
# # TODO, ty cmd over repo
# # - name: type check with ty
# # run: ty ./tractor/
# # - uses: actions/cache@v3
# # name: Cache uv virtenv as default .venv
# # with:
# # path: ./.venv
# # key: venv-${{ hashFiles('uv.lock') }}
# - name: Run MyPy check
# run: mypy tractor/ --ignore-missing-imports --show-traceback
testing-linux: testing-linux:
@ -59,32 +83,45 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest] os: [ubuntu-latest]
python: ['3.11'] python-version: ['3.13']
spawn_backend: [ spawn_backend: [
'trio', 'trio',
'mp_spawn', # 'mp_spawn',
'mp_forkserver', # 'mp_forkserver',
] ]
steps: steps:
- name: Checkout - uses: actions/checkout@v4
uses: actions/checkout@v2
- name: Setup python - name: 'Install uv + py-${{ matrix.python-version }}'
uses: actions/setup-python@v2 uses: astral-sh/setup-uv@v6
with: with:
python-version: '${{ matrix.python }}' python-version: ${{ matrix.python-version }}
- name: Install dependencies # GH way.. faster?
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager # - name: setup-python@v6
# uses: actions/setup-python@v6
# with:
# python-version: '${{ matrix.python-version }}'
- name: List dependencies # consider caching for speedups?
run: pip list # https://docs.astral.sh/uv/guides/integration/github/#caching
- name: Install the project w uv
run: uv sync --all-extras --dev
# - name: Install dependencies
# run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: List deps tree
run: uv tree
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx run: uv run pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# XXX legacy NOTE XXX
#
# We skip 3.10 on windows for now due to not having any collabs to # We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very # debug the CI failures. Anyone wanting to hack and solve them is very
# welcome, but our primary user base is not using that OS. # welcome, but our primary user base is not using that OS.

View File

@ -1,8 +1,5 @@
|logo| ``tractor``: distributed structurred concurrency |logo| ``tractor``: distributed structurred concurrency
|gh_actions|
|docs|
``tractor`` is a `structured concurrency`_ (SC), multi-processing_ runtime built on trio_. ``tractor`` is a `structured concurrency`_ (SC), multi-processing_ runtime built on trio_.
Fundamentally, ``tractor`` provides parallelism via Fundamentally, ``tractor`` provides parallelism via
@ -66,6 +63,13 @@ Features
- (WIP) a ``TaskMngr``: one-cancels-one style nursery supervisor. - (WIP) a ``TaskMngr``: one-cancels-one style nursery supervisor.
Status of `main` / infra
------------------------
- |gh_actions|
- |docs|
Install Install
------- -------
``tractor`` is still in a *alpha-near-beta-stage* for many ``tractor`` is still in a *alpha-near-beta-stage* for many
@ -689,9 +693,11 @@ channel`_!
.. _msgspec: https://jcristharif.com/msgspec/ .. _msgspec: https://jcristharif.com/msgspec/
.. _guest: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops .. _guest: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
..
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square NOTE, on generating badge links from the UI
:target: https://actions-badge.atrox.dev/goodboy/tractor/goto https://docs.github.com/en/actions/how-tos/monitoring-and-troubleshooting-workflows/monitoring-workflows/adding-a-workflow-status-badge?ref=gitguardian-blog-automated-secrets-detection#using-the-ui
.. |gh_actions| image:: https://github.com/goodboy/tractor/actions/workflows/ci.yml/badge.svg?branch=main
:target: https://github.com/goodboy/tractor/actions/workflows/ci.yml
.. |docs| image:: https://readthedocs.org/projects/tractor/badge/?version=latest .. |docs| image:: https://readthedocs.org/projects/tractor/badge/?version=latest
:target: https://tractor.readthedocs.io/en/latest/?badge=latest :target: https://tractor.readthedocs.io/en/latest/?badge=latest

View File

@ -24,10 +24,9 @@ async def spawn_until(depth=0):
async def main(): async def main():
"""The main ``tractor`` routine. '''
The process tree should look as approximately as follows when the
The process tree should look as approximately as follows when the debugger debugger first engages:
first engages:
python examples/debugging/multi_nested_subactors_bp_forever.py python examples/debugging/multi_nested_subactors_bp_forever.py
python -m tractor._child --uid ('spawner1', '7eab8462 ...) python -m tractor._child --uid ('spawner1', '7eab8462 ...)
@ -37,10 +36,11 @@ async def main():
python -m tractor._child --uid ('spawner0', '1d42012b ...) python -m tractor._child --uid ('spawner0', '1d42012b ...)
python -m tractor._child --uid ('name_error', '6c2733b8 ...) python -m tractor._child --uid ('name_error', '6c2733b8 ...)
""" '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='warning' loglevel='devx',
enable_transports=['uds'],
) as n: ) as n:
# spawn both actors # spawn both actors

View File

@ -37,6 +37,7 @@ async def main(
enable_stack_on_sig=True, enable_stack_on_sig=True,
# maybe_enable_greenback=False, # maybe_enable_greenback=False,
loglevel='devx', loglevel='devx',
enable_transports=['uds'],
) as an, ) as an,
): ):
ptl: tractor.Portal = await an.start_actor( ptl: tractor.Portal = await an.start_actor(

View File

@ -61,7 +61,9 @@ dev = [
# `tractor.devx` tooling # `tractor.devx` tooling
"greenback>=1.2.1,<2", "greenback>=1.2.1,<2",
"stackscope>=0.2.2,<0.3", "stackscope>=0.2.2,<0.3",
"typing-extensions>=4.13.2", # needed for stackscope # ^ requires this?
"typing-extensions>=4.14.1",
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50", "prompt-toolkit>=3.0.50",
"xonsh>=0.19.2", "xonsh>=0.19.2",

View File

@ -0,0 +1,95 @@
'''
Verify the `enable_transports` param drives various
per-root/sub-actor IPC endpoint/server settings.
'''
from __future__ import annotations
import pytest
import trio
import tractor
from tractor import (
Actor,
Portal,
ipc,
msg,
_state,
_addr,
)
@tractor.context
async def chk_tpts(
ctx: tractor.Context,
tpt_proto_key: str,
):
rtvars = _state._runtime_vars
assert (
tpt_proto_key
in
rtvars['_enable_tpts']
)
actor: Actor = tractor.current_actor()
spec: msg.types.SpawnSpec = actor._spawn_spec
assert spec._runtime_vars == rtvars
# ensure individual IPC ep-addr types
serv: ipc._server.Server = actor.ipc_server
addr: ipc._types.Address
for addr in serv.addrs:
assert addr.proto_key == tpt_proto_key
# Actor delegate-props enforcement
assert (
actor.accept_addrs
==
serv.accept_addrs
)
await ctx.started(serv.accept_addrs)
# TODO, parametrize over mis-matched-proto-typed `registry_addrs`
# since i seems to work in `piker` but not exactly sure if both tcp
# & uds are being deployed then?
#
@pytest.mark.parametrize(
'tpt_proto_key',
['tcp', 'uds'],
ids=lambda item: f'ipc_tpt={item!r}'
)
def test_root_passes_tpt_to_sub(
tpt_proto_key: str,
reg_addr: tuple,
debug_mode: bool,
):
async def main():
async with tractor.open_nursery(
enable_transports=[tpt_proto_key],
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
assert (
tpt_proto_key
in
_state._runtime_vars['_enable_tpts']
)
ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with ptl.open_context(
chk_tpts,
tpt_proto_key=tpt_proto_key,
) as (ctx, accept_addrs):
uw_addr: tuple
for uw_addr in accept_addrs:
addr = _addr.wrap_address(uw_addr)
assert addr.is_valid
# shudown sub-actor(s)
await an.cancel()
trio.run(main)

View File

@ -49,7 +49,7 @@ def test_basic_ipc_server(
) )
assert server._no_more_peers.is_set() assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on( eps: list[ipc._server.Endpoint] = await server.listen_on(
accept_addrs=[rando_addr], accept_addrs=[rando_addr],
stream_handler_nursery=None, stream_handler_nursery=None,
) )

View File

@ -1069,9 +1069,25 @@ class Context:
|RemoteActorError # stream overrun caused and ignored by us |RemoteActorError # stream overrun caused and ignored by us
): ):
''' '''
Maybe raise a remote error depending on the type of error Maybe raise a remote error depending on the type of error and
and *who* (i.e. which task from which actor) requested *who*, i.e. which side of the task pair across actors,
a cancellation (if any). requested a cancellation (if any).
Depending on the input config-params suppress raising
certain remote excs:
- if `remote_error: ContextCancelled` (ctxc) AND this side's
task is the "requester", it at somem point called
`Context.cancel()`, then the peer's ctxc is treated
as a "cancel ack".
|_ this behaves exactly like how `trio.Nursery.cancel_scope`
absorbs any `BaseExceptionGroup[trio.Cancelled]` wherein the
owning parent task never will raise a `trio.Cancelled`
if `CancelScope.cancel_called == True`.
- `remote_error: StreamOverrrun` (overrun) AND
`raise_overrun_from_self` is set.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -1113,18 +1129,19 @@ class Context:
# for this ^, NO right? # for this ^, NO right?
) or ( ) or (
# NOTE: whenever this context is the cause of an # NOTE: whenever this side is the cause of an
# overrun on the remote side (aka we sent msgs too # overrun on the peer side, i.e. we sent msgs too
# fast that the remote task was overrun according # fast and the peer task was overrun according
# to `MsgStream` buffer settings) AND the caller # to `MsgStream` buffer settings, AND this was
# has requested to not raise overruns this side # called with `raise_overrun_from_self=True` (the
# caused, we also silently absorb any remotely # default), silently absorb any `StreamOverrun`.
# boxed `StreamOverrun`. This is mostly useful for #
# supressing such faults during # XXX, this is namely useful for supressing such faults
# cancellation/error/final-result handling inside # during cancellation/error/final-result handling inside
# `msg._ops.drain_to_final_msg()` such that we do not # `.msg._ops.drain_to_final_msg()` such that we do not
# raise such errors particularly in the case where # raise during a cancellation-request, i.e. when
# `._cancel_called == True`. # `._cancel_called == True`.
#
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun and remote_error.boxed_type is StreamOverrun

View File

@ -48,7 +48,6 @@ from ._state import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
from .ipc._server import IPCServer
log = get_logger(__name__) log = get_logger(__name__)
@ -80,7 +79,7 @@ async def get_registry(
) )
else: else:
# TODO: try to look pre-existing connection from # TODO: try to look pre-existing connection from
# `IPCServer._peers` and use it instead? # `Server._peers` and use it instead?
async with ( async with (
_connect_chan(addr) as chan, _connect_chan(addr) as chan,
open_portal(chan) as regstr_ptl, open_portal(chan) as regstr_ptl,
@ -112,18 +111,23 @@ def get_peer_by_name(
) -> list[Channel]|None: # at least 1 ) -> list[Channel]|None: # at least 1
''' '''
Scan for an existing connection (set) to a named actor Scan for an existing connection (set) to a named actor
and return any channels from `IPCServer._peers: dict`. and return any channels from `Server._peers: dict`.
This is an optimization method over querying the registrar for This is an optimization method over querying the registrar for
the same info. the same info.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
server: IPCServer = actor.ipc_server to_scan: dict[tuple, list[Channel]] = actor.ipc_server._peers.copy()
to_scan: dict[tuple, list[Channel]] = server._peers.copy()
pchan: Channel|None = actor._parent_chan # TODO: is this ever needed? creates a duplicate channel on actor._peers
if pchan: # when multiple find_actor calls are made to same actor from a single ctx
to_scan[pchan.uid].append(pchan) # which causes actor exit to hang waiting forever on
# `actor._no_more_peers.wait()` in `_runtime.async_main`
# pchan: Channel|None = actor._parent_chan
# if pchan and pchan.uid not in to_scan:
# to_scan[pchan.uid].append(pchan)
for aid, chans in to_scan.items(): for aid, chans in to_scan.items():
_, peer_name = aid _, peer_name = aid

View File

@ -222,11 +222,16 @@ async def open_root_actor(
): ):
if enable_transports is None: if enable_transports is None:
enable_transports: list[str] = _state.current_ipc_protos() enable_transports: list[str] = _state.current_ipc_protos()
else:
_state._runtime_vars['_enable_tpts'] = enable_transports
# TODO! support multi-tpts per actor! Bo # TODO! support multi-tpts per actor!
assert ( # Bo
len(enable_transports) == 1 if not len(enable_transports) == 1:
), 'No multi-tpt support yet!' raise RuntimeError(
f'No multi-tpt support yet!\n'
f'enable_transports={enable_transports!r}\n'
)
_frame_stack.hide_runtime_frames() _frame_stack.hide_runtime_frames()
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb

View File

@ -877,9 +877,7 @@ class Actor:
return ( return (
chan, chan,
accept_addrs, accept_addrs,
None, _state._runtime_vars['_enable_tpts']
# ^TODO, preferred tpts list from rent!
# -[ ] need to extend the `SpawnSpec` tho!
) )
# failed to connect back? # failed to connect back?
@ -1453,10 +1451,12 @@ async def async_main(
# all sub-actors should be able to speak to # all sub-actors should be able to speak to
# their root actor over that channel. # their root actor over that channel.
if _state._runtime_vars['_is_root']: if _state._runtime_vars['_is_root']:
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
for addr in accept_addrs: for addr in accept_addrs:
waddr = wrap_address(addr) waddr: Address = wrap_address(addr)
if waddr == waddr.get_root(): raddrs.append(addr)
_state._runtime_vars['_root_mailbox'] = addr else:
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.runtime( log.runtime(

View File

@ -37,6 +37,13 @@ if TYPE_CHECKING:
from ._context import Context from ._context import Context
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None _last_actor_terminated: Actor|None = None
@ -47,6 +54,10 @@ _runtime_vars: dict[str, Any] = {
# root of actor-process tree info # root of actor-process tree info
'_is_root': False, # bool '_is_root': False, # bool
'_root_mailbox': (None, None), # tuple[str|None, str|None] '_root_mailbox': (None, None), # tuple[str|None, str|None]
'_root_addrs': [], # tuple[str|None, str|None]
# parent->chld ipc protocol caps
'_enable_tpts': [_def_tpt_proto],
# registrar info # registrar info
'_registry_addrs': [], '_registry_addrs': [],
@ -180,14 +191,6 @@ def get_rt_dir(
return rtdir return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'
def current_ipc_protos() -> list[str]: def current_ipc_protos() -> list[str]:
''' '''
Return the list of IPC transport protocol keys currently Return the list of IPC transport protocol keys currently
@ -197,4 +200,4 @@ def current_ipc_protos() -> list[str]:
concrete-backend sub-types defined throughout `tractor.ipc`. concrete-backend sub-types defined throughout `tractor.ipc`.
''' '''
return [_def_tpt_proto] return _runtime_vars['_enable_tpts']

View File

@ -143,7 +143,7 @@ async def maybe_wait_on_canced_subs(
log.cancel( log.cancel(
'Waiting on cancel request to peer..\n' 'Waiting on cancel request to peer..\n'
f'c)=>\n' f'c)=>\n'
f' |_{chan.uid}\n' f' |_{chan.aid}\n'
) )
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
@ -156,7 +156,7 @@ async def maybe_wait_on_canced_subs(
# local runtime here is now cancelled while # local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing. # (presumably) in the middle of msg loop processing.
chan_info: str = ( chan_info: str = (
f'{chan.uid}\n' f'{chan.aid}\n'
f'|_{chan}\n' f'|_{chan}\n'
f' |_{chan.transport}\n\n' f' |_{chan.transport}\n\n'
) )
@ -279,7 +279,7 @@ async def maybe_wait_on_canced_subs(
log.runtime( log.runtime(
f'Peer IPC broke but subproc is alive?\n\n' f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n' f'<=x {chan.aid}@{chan.raddr}\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
# #
# -[x] maybe change to mod-func and rename for implied # -[x] maybe change to mod-func and rename for implied
# multi-transport semantics? # multi-transport semantics?
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint` # -[ ] register each stream/tpt/chan with the owning `Endpoint`
# so that we can query per tpt all peer contact infos? # so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a # |_[ ] possibly provide a global viewing via a
# `collections.ChainMap`? # `collections.ChainMap`?
@ -309,7 +309,7 @@ async def handle_stream_from_peer(
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery` any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
such that it is invoked as, such that it is invoked as,
IPCEndpoint.stream_handler_tn.start_soon( Endpoint.stream_handler_tn.start_soon(
handle_stream, handle_stream,
stream, stream,
) )
@ -460,7 +460,7 @@ async def handle_stream_from_peer(
# drop ref to channel so it can be gc-ed and disconnected # drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = ( con_teardown_status: str = (
f'IPC channel disconnected:\n' f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n' f'<=x uid: {chan.aid}\n'
f' |_{pformat(chan)}\n\n' f' |_{pformat(chan)}\n\n'
) )
chans.remove(chan) chans.remove(chan)
@ -468,7 +468,7 @@ async def handle_stream_from_peer(
# TODO: do we need to be this pedantic? # TODO: do we need to be this pedantic?
if not chans: if not chans:
con_teardown_status += ( con_teardown_status += (
f'-> No more channels with {chan.uid}' f'-> No more channels with {chan.aid}'
) )
server._peers.pop(uid, None) server._peers.pop(uid, None)
@ -519,7 +519,7 @@ async def handle_stream_from_peer(
and and
(ctx_in_debug := pdb_lock.ctx_in_debug) (ctx_in_debug := pdb_lock.ctx_in_debug)
and and
(pdb_user_uid := ctx_in_debug.chan.uid) (pdb_user_uid := ctx_in_debug.chan.aid)
): ):
entry: tuple|None = local_nursery._children.get( entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid) tuple(pdb_user_uid)
@ -577,7 +577,7 @@ async def handle_stream_from_peer(
# finally block closure # finally block closure
class IPCEndpoint(Struct): class Endpoint(Struct):
''' '''
An instance of an IPC "bound" address where the lifetime of the An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle "ability to accept connections" (from clients) and then handle
@ -636,7 +636,7 @@ class IPCEndpoint(Struct):
) )
class IPCServer(Struct): class Server(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently # level-triggered sig for whether "no peers are currently
@ -644,7 +644,7 @@ class IPCServer(Struct):
# initialized with `.is_set() == True`. # initialized with `.is_set() == True`.
_no_more_peers: trio.Event _no_more_peers: trio.Event
_endpoints: list[IPCEndpoint] = [] _endpoints: list[Endpoint] = []
# connection tracking & mgmt # connection tracking & mgmt
_peers: defaultdict[ _peers: defaultdict[
@ -659,10 +659,10 @@ class IPCServer(Struct):
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[IPCEndpoint]` and # TODO, maybe just make `._endpoints: list[Endpoint]` and
# provide dict-views onto it? # provide dict-views onto it?
# @property # @property
# def addrs2eps(self) -> dict[Address, IPCEndpoint]: # def addrs2eps(self) -> dict[Address, Endpoint]:
# ... # ...
@property @property
@ -708,7 +708,7 @@ class IPCServer(Struct):
await self._shutdown.wait() await self._shutdown.wait()
else: else:
tpt_protos: list[str] = [] tpt_protos: list[str] = []
ep: IPCEndpoint ep: Endpoint
for ep in self._endpoints: for ep in self._endpoints:
tpt_protos.append(ep.addr.proto_key) tpt_protos.append(ep.addr.proto_key)
@ -790,7 +790,7 @@ class IPCServer(Struct):
def epsdict(self) -> dict[ def epsdict(self) -> dict[
Address, Address,
IPCEndpoint, Endpoint,
]: ]:
return { return {
ep.addr: ep ep.addr: ep
@ -804,7 +804,7 @@ class IPCServer(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: def pformat(self) -> str:
eps: list[IPCEndpoint] = self._endpoints eps: list[Endpoint] = self._endpoints
state_repr: str = ( state_repr: str = (
f'{len(eps)!r} IPC-endpoints active' f'{len(eps)!r} IPC-endpoints active'
@ -835,13 +835,13 @@ class IPCServer(Struct):
# TODO? maybe allow shutting down a `.listen_on()`s worth of # TODO? maybe allow shutting down a `.listen_on()`s worth of
# listeners by cancelling the corresponding # listeners by cancelling the corresponding
# `IPCEndpoint._listen_tn` only ? # `Endpoint._listen_tn` only ?
# -[ ] in theory you could use this to # -[ ] in theory you could use this to
# "boot-and-wait-for-reconnect" of all current and connecting # "boot-and-wait-for-reconnect" of all current and connecting
# peers? # peers?
# |_ would require that the stream-handler is intercepted so we # |_ would require that the stream-handler is intercepted so we
# can intercept every `MsgTransport` (stream) and track per # can intercept every `MsgTransport` (stream) and track per
# `IPCEndpoint` likely? # `Endpoint` likely?
# #
# async def unlisten( # async def unlisten(
# self, # self,
@ -854,7 +854,7 @@ class IPCServer(Struct):
*, *,
accept_addrs: list[tuple[str, int|str]]|None = None, accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None, stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]: ) -> list[Endpoint]:
''' '''
Start `SocketListeners` (i.e. bind and call `socket.listen()`) Start `SocketListeners` (i.e. bind and call `socket.listen()`)
for all IPC-transport-protocol specific `Address`-types for all IPC-transport-protocol specific `Address`-types
@ -888,7 +888,7 @@ class IPCServer(Struct):
f'Binding to endpoints for,\n' f'Binding to endpoints for,\n'
f'{accept_addrs}\n' f'{accept_addrs}\n'
) )
eps: list[IPCEndpoint] = await self._parent_tn.start( eps: list[Endpoint] = await self._parent_tn.start(
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
server=self, server=self,
@ -904,7 +904,7 @@ class IPCServer(Struct):
self._endpoints.extend(eps) self._endpoints.extend(eps)
# XXX, just a little bit of sanity # XXX, just a little bit of sanity
group_tn: Nursery|None = None group_tn: Nursery|None = None
ep: IPCEndpoint ep: Endpoint
for ep in eps: for ep in eps:
if ep.addr not in self.addrs: if ep.addr not in self.addrs:
breakpoint() breakpoint()
@ -917,6 +917,10 @@ class IPCServer(Struct):
return eps return eps
# alias until we decide on final naming
IPCServer = Server
async def _serve_ipc_eps( async def _serve_ipc_eps(
*, *,
server: IPCServer, server: IPCServer,
@ -941,12 +945,12 @@ async def _serve_ipc_eps(
listen_tn: Nursery listen_tn: Nursery
async with trio.open_nursery() as listen_tn: async with trio.open_nursery() as listen_tn:
eps: list[IPCEndpoint] = [] eps: list[Endpoint] = []
# XXX NOTE, required to call `serve_listeners()` below. # XXX NOTE, required to call `serve_listeners()` below.
# ?TODO, maybe just pass `list(eps.values()` tho? # ?TODO, maybe just pass `list(eps.values()` tho?
listeners: list[trio.abc.Listener] = [] listeners: list[trio.abc.Listener] = []
for addr in listen_addrs: for addr in listen_addrs:
ep = IPCEndpoint( ep = Endpoint(
addr=addr, addr=addr,
listen_tn=listen_tn, listen_tn=listen_tn,
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
@ -1010,7 +1014,7 @@ async def _serve_ipc_eps(
finally: finally:
if eps: if eps:
addr: Address addr: Address
ep: IPCEndpoint ep: Endpoint
for addr, ep in server.epsdict().items(): for addr, ep in server.epsdict().items():
ep.close_listener() ep.close_listener()
server._endpoints.remove(ep) server._endpoints.remove(ep)

View File

@ -18,6 +18,7 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
''' '''
from __future__ import annotations from __future__ import annotations
import ipaddress
from typing import ( from typing import (
ClassVar, ClassVar,
) )
@ -50,13 +51,45 @@ class TCPAddress(
_host: str _host: str
_port: int _port: int
def __post_init__(self):
try:
ipaddress.ip_address(self._host)
except ValueError as valerr:
raise ValueError(
'Invalid {type(self).__name__}._host = {self._host!r}\n'
) from valerr
proto_key: ClassVar[str] = 'tcp' proto_key: ClassVar[str] = 'tcp'
unwrapped_type: ClassVar[type] = tuple[str, int] unwrapped_type: ClassVar[type] = tuple[str, int]
def_bindspace: ClassVar[str] = '127.0.0.1' def_bindspace: ClassVar[str] = '127.0.0.1'
# ?TODO, actually validate ipv4/6 with stdlib's `ipaddress`
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
return self._port != 0 '''
Predicate to ensure a valid socket-address pair.
'''
return (
self._port != 0
and
(ipaddr := ipaddress.ip_address(self._host))
and not (
ipaddr.is_reserved
or
ipaddr.is_unspecified
or
ipaddr.is_link_local
or
ipaddr.is_link_local
or
ipaddr.is_multicast
or
ipaddr.is_global
)
)
# ^XXX^ see various properties of invalid addrs here,
# https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address
@property @property
def bindspace(self) -> str: def bindspace(self) -> str:

View File

@ -170,6 +170,7 @@ class SpawnSpec(
# a hard `Struct` def for all of these fields! # a hard `Struct` def for all of these fields!
_parent_main_data: dict _parent_main_data: dict
_runtime_vars: dict[str, Any] _runtime_vars: dict[str, Any]
# ^NOTE see `._state._runtime_vars: dict`
# module import capability # module import capability
enable_modules: dict[str, str] enable_modules: dict[str, str]

View File

@ -70,7 +70,8 @@ async def maybe_open_nursery(
yield nursery yield nursery
else: else:
async with lib.open_nursery(**kwargs) as nursery: async with lib.open_nursery(**kwargs) as nursery:
nursery.cancel_scope.shield = shield if lib == trio:
nursery.cancel_scope.shield = shield
yield nursery yield nursery

View File

@ -417,7 +417,7 @@ dev = [
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.3.5" }, { name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" }, { name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.13.2" }, { name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.19.2" }, { name = "xonsh", specifier = ">=0.19.2" },
] ]
@ -452,11 +452,11 @@ wheels = [
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"
version = "4.13.2" version = "4.14.1"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f6/37/23083fcd6e35492953e8d2aaaa68b860eb422b34627b13f2ce3eb6106061/typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef", size = 106967, upload-time = "2025-04-10T14:19:05.416Z" } sdist = { url = "https://files.pythonhosted.org/packages/98/5a/da40306b885cc8c09109dc2e1abd358d5684b1425678151cdaed4731c822/typing_extensions-4.14.1.tar.gz", hash = "sha256:38b39f4aeeab64884ce9f74c94263ef78f3c22467c8724005483154c26648d36", size = 107673, upload-time = "2025-07-04T13:28:34.16Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/8b/54/b1ae86c0973cc6f0210b53d508ca3641fb6d0c56823f288d108bc7ab3cc8/typing_extensions-4.13.2-py3-none-any.whl", hash = "sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c", size = 45806, upload-time = "2025-04-10T14:19:03.967Z" }, { url = "https://files.pythonhosted.org/packages/b5/00/d631e67a838026495268c2f6884f3711a15a9a2a96cd244fdaea53b823fb/typing_extensions-4.14.1-py3-none-any.whl", hash = "sha256:d1e1e3b58374dc93031d6eda2420a48ea44a36c2b4766a4fdeb3710755731d76", size = 43906, upload-time = "2025-07-04T13:28:32.743Z" },
] ]
[[package]] [[package]]