Compare commits

...

50 Commits

Author SHA1 Message Date
Tyler Goodlet 243b9cfe3b Bump "task-manager(-nursery)" naming, add logging
Namely just renaming any `trio.Nursery` instances to `tn`, the primary
`@acm`-API to `.trionics.open_taskman()` and change out all `print()`s
for logger instances with 'info' level enabled by the mod-script usage.
2026-04-05 13:42:09 -04:00
Tyler Goodlet 4145ff9098 Add a new `.trionics._tn` for "task nursery stuff"
Since I'd like to decouple the new "task-manager-nursery" lowlevel
primitives/abstractions from the higher-level
`TaskManagerNursery`-supporting API(s) and default per-task
supervision-strat and because `._mngr` is already purposed for
higher-level "on-top-of-nursery" patterns as it is.

Deats,
- move `maybe_open_nursery()` into the new mod.
- adjust the pkg-mod's import to the new sub-mod.
- also draft up this idea for an API which stacks `._beg.collapse_eg()`
  onto a nursery with the WIP name `open_loose_tn()` but more then
  likely i'll just discard this idea bc i think the explicit `@acm`
  stacking is more explicit/pythonic/up-front-grokable despite the extra
  LoC.
2026-04-05 13:42:09 -04:00
Tyler Goodlet 4e931df685 Add `debug_mode: bool` control to task mngr
Allows dynamically importing `pdbp` when enabled and a way for
eventually linking with `tractor`'s own debug mode flag.
2026-04-05 13:42:09 -04:00
Tyler Goodlet 2e3a554ca9 Go all in on "task manager" naming 2026-04-05 13:42:09 -04:00
Tyler Goodlet 5ec2d45413 More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
2026-04-05 13:42:09 -04:00
Tyler Goodlet 47f790035e Ensure user-allocated cancel scope just works!
Turns out the nursery doesn't have to care about allocating a per task
`CancelScope` since the user can just do that in the
`@task_scope_manager` if desired B) So just mask all the nursery cs
allocating with the intention of removal.

Also add a test for per-task-cancellation by starting the crash task as
a `trio.sleep_forever()` but then cancel it via the user allocated cs
and ensure the crash propagates as expected 💥
2026-04-05 13:42:09 -04:00
Tyler Goodlet 81b02a1193 Facepalm, don't pass in unecessary cancel scope 2026-04-05 13:42:09 -04:00
Tyler Goodlet 8ab8586ae2 Do renaming, implement lowlevel `Outcome` sending
As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
2026-04-05 13:42:09 -04:00
Tyler Goodlet 6ea17b714e Alias to `@acm` in broadcaster mod 2026-04-05 13:42:09 -04:00
Tyler Goodlet 1a7e5042fa Initial prototype for a one-cancels-one style supervisor, nursery thing.. 2026-04-05 13:42:09 -04:00
Tyler Goodlet 0a05ef9fa7 Use shorthand nursery var-names per convention in codebase 2026-04-05 13:42:09 -04:00
Tyler Goodlet 21c18c915f Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2026-04-05 13:42:09 -04:00
Tyler Goodlet de84927779 Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2026-04-05 13:42:09 -04:00
Tyler Goodlet 6cc66fac72 Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2026-04-05 13:42:09 -04:00
Bd bbf01d5161
Merge pull request #430 from goodboy/dependabot/uv/pygments-2.20.0
Bump pygments from 2.19.2 to 2.20.0
2026-04-05 13:33:33 -04:00
Bd ec8e8a2786
Merge pull request #426 from goodboy/remote_exc_type_registry
Fix remote exc relay + add `reg_err_types()` tests
2026-04-02 22:44:36 -04:00
Gud Boi c3d1ec22eb Fix `Type[BaseException]` annots, guard `.src_type` resolve
- Use `Type[BaseException]` (not bare `BaseException`)
  for all err-type references: `get_err_type()` return,
  `._src_type`, `boxed_type` in `unpack_error()`.
- Add `|None` where types can be unresolvable
  (`get_err_type()`, `.boxed_type` property).
- Add `._src_type_resolved` flag to prevent repeated
  lookups and guard against `._ipc_msg is None`.
- Fix `recevier` and `exeptions` typos.

Review: PR #426 (Copilot)
https://github.com/goodboy/tractor/pull/426

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 18:21:19 -04:00
Gud Boi 8f44efa327 Drop stale `.cancel()`, fix docstring typo in tests
- Remove leftover `await an.cancel()` in
  `test_registered_custom_err_relayed`; the
  nursery already cancels on scope exit.
- Fix `This document` -> `This documents` typo in
  `test_unregistered_err_still_relayed` docstring.

Review: PR #426 (Copilot)
https://github.com/goodboy/tractor/pull/426

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 18:21:19 -04:00
Gud Boi 5968a3c773 Use `'<unknown>'` for unresolvable `.boxed_type_str`
Add a teensie unit test to match.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 18:21:19 -04:00
Gud Boi 80597b80bf Add passing test for unregistered err relay
Drop the `xfail` test and instead add a new one that ensures the
`tractor._exceptions` fixes enable graceful relay of
remote-but-unregistered error types via the unboxing of just the
`rae.src_type_str/boxed_type_str` content. The test also ensures
a warning is included with remote error content indicating the user
should register their error type for effective cross-actor re-raising.

Deats,
- add `test_unregistered_err_still_relayed`: verify the
  `RemoteActorError` IS raised with `.boxed_type`
  as `None` but `.src_type_str`, `.boxed_type_str`,
  and `.tb_str` all preserved from the IPC msg.
- drop `test_unregistered_boxed_type_resolution_xfail`
  since the new above case covers it and we don't need to have
  an effectively entirely repeated test just with an inverse assert
  as it's last line..

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 18:21:19 -04:00
Gud Boi a41c6d5c70 Fix unregistered-remote-error-type relay crash
Make `RemoteActorError` resilient to unresolved
custom error types so that errors from remote actors
always relay back to the caller - even when the user
hasn't called `reg_err_types()` to register the exc type.

Deats,
- `.src_type`: log warning + return `None` instead
  of raising `TypeError` which was crashing the
  entire `_deliver_msg()` -> `pformat()` chain
  before the error could be relayed.
- `.boxed_type_str`: fallback to `_ipc_msg.boxed_type_str`
  when the type obj can't be resolved so the type *name* is always
  available.
- `unwrap_src_err()`: fallback to `RuntimeError` preserving
  original type name + traceback.
- `unpack_error()`: log warning when `get_err_type()` returns
  `None` telling the user to call `reg_err_types()`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 18:21:19 -04:00
Gud Boi 9c37b3f956 Add `reg_err_types()` test suite for remote exc relay
Verify registered custom error types round-trip correctly over IPC via
`reg_err_types()` + `get_err_type()`.

Deats,
- `TestRegErrTypesPlumbing`: 5 unit tests for the type-registry plumbing
  (register, lookup, builtins, tractor-native types, unregistered
  returns `None`)
- `test_registered_custom_err_relayed`: IPC end-to-end for a registered
  `CustomAppError` checking `.boxed_type`, `.src_type`, and `.tb_str`
- `test_registered_another_err_relayed`: same for `AnotherAppError`
  (multi-type coverage)
- `test_unregistered_custom_err_fails_lookup`: `xfail` documenting that
  `.boxed_type` can't resolve without `reg_err_types()` registration

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 18:21:19 -04:00
Bd 8f6bc56174
Merge pull request #427 from goodboy/subsys_reorg
Mv core mods to `runtime/`, `spawn/`, `discovery/` subpkgs
2026-04-02 18:21:00 -04:00
Gud Boi b14dbde77b Skip `test_empty_mngrs_input_raises` on UDS tpt
The `open_actor_cluster()` teardown hangs
intermittently on UDS when `gather_contexts(mngrs=())`
raises `ValueError` mid-setup; likely a race in the
actor-nursery cleanup vs UDS socket shutdown. TCP
passes reliably (5/5 runs).

- Add `tpt_proto` fixture param to the test
- `pytest.skip()` on UDS with a TODO for deeper
  investigation of `._clustering`/`._supervise`
  teardown paths

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi cd6509b724 Fix `tractor_test` kwarg check and Windows `start_method` default
- Use `kw in kwargs` membership test instead of
  `kwargs[kw]` to avoid `KeyError` on missing params.
- Restructure Windows `start_method` logic to properly
  default to `'trio'` when unset; only raise on an
  explicit non-trio value.

Review: PR #427 (Copilot)
https://github.com/goodboy/tractor/pull/427#pullrequestreview-4009934142

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi 93d99ed2eb Move `get_cpu_state()` to `conftest` as shared latency headroom
Factor the CPU-freq-scaling helper out of
`test_legacy_one_way_streaming` into `conftest.py`
alongside a new `cpu_scaling_factor()` convenience fn
that returns a latency-headroom multiplier (>= 1.0).

Apply it to the two other flaky-timeout tests,
- `test_cancel_via_SIGINT_other_task`: 2s -> scaled
- `test_example[we_are_processes.py]`: 16s -> scaled

Deats,
- add `get_cpu_state()` + `cpu_scaling_factor()` to
  `conftest.py` so all test mods can share the logic.
- catch `IndexError` (empty glob) in addition to
  `FileNotFoundError`.
- rename `factor` var -> `headroom` at call sites for
  clarity on intent.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi 6215e3b2dd Adjust `test_a_quadruple_example` time-limit for CPU scaling
Add `get_cpu_state()` helper to read CPU freq settings
from `/sys/devices/system/cpu/` and use it to compensate
the perf time-limit when `auto-cpufreq` (or similar)
scales down the max frequency.

Deats,
- read `*_pstate_max_freq` and `scaling_max_freq`
  to compute a `cpu_scaled` ratio.
- when `cpu_scaled != 1.`, increase `this_fast` limit
  proportionally (factoring dual-threaded cores).
- log a warning via `test_log` when compensating.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi be5d8da8c0 Just alias `Arbiter` via assignment 2026-04-02 17:59:13 -04:00
Gud Boi 21ed181835 Filter `__pycache__` from example discovery in tests
(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi 9ec2749ab7 Rename `Arbiter` -> `Registrar`, mv to `discovery._registry`
Move the `Arbiter` class out of `runtime._runtime` into its
logical home at `discovery._registry` as `Registrar(Actor)`.
This completes the long-standing terminology migration from
"arbiter" to "registrar/registry" throughout the codebase.

Deats,
- add new `discovery/_registry.py` mod with `Registrar`
  class + backward-compat `Arbiter = Registrar` alias.
- rename `Actor.is_arbiter` attr -> `.is_registrar`;
  old attr now a `@property` with `DeprecationWarning`.
- `_root.py` imports `Registrar` directly for
  root-actor instantiation.
- export `Registrar` + `Arbiter` from `tractor.__init__`.
- `_runtime.py` re-imports from `discovery._registry`
  for backward compat.

Also,
- update all test files to use `.is_registrar`
  (`test_local`, `test_rpc`, `test_spawning`,
  `test_discovery`, `test_multi_program`).
- update "arbiter" -> "registrar" in comments/docstrings
  across `_discovery.py`, `_server.py`, `_transport.py`,
  `_testing/pytest.py`, and examples.
- drop resolved TODOs from `_runtime.py` and `_root.py`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi f3441a6790 Update tests+examples imports for new subpkgs
Adjust all `tractor._state`, `tractor._addr`,
`tractor._supervise`, etc. refs in tests and examples
to use the new `runtime/`, `discovery/`, `spawn/` paths.

Also,
- use `tractor.debug_mode()` pub API instead of
  `tractor._state.debug_mode()` in a few test mods
- add explicit `timeout=20` to `test_respawn_consumer_task`
  `@tractor_test` deco call

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi cc42d38284 Mv core mods to `runtime/`, `spawn/`, `discovery/` subpkgs
Restructure the flat `tractor/` top-level private mods
into (more nested) subpackages:

- `runtime/`: `_runtime`, `_portal`, `_rpc`, `_state`,
  `_supervise`
- `spawn/`: `_spawn`, `_entry`, `_forkserver_override`,
  `_mp_fixup_main`
- `discovery/`: `_addr`, `_discovery`, `_multiaddr`

Each subpkg `__init__.py` is kept lazy (no eager
imports) to avoid circular import issues.

Also,
- update all intra-pkg imports across ~35 mods to use
  the new subpkg paths (e.g. `from .runtime._state`
  instead of `from ._state`)

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi 6827ceba12 Use `wrapt` for `tractor_test()` decorator
Refactor the test-fn deco to use `wrapt.decorator`
instead of `functools.wraps` for better fn-sig
preservation and optional-args support via
`PartialCallableObjectProxy`.

Deats,
- add `timeout` and `hide_tb` deco params
- wrap test-fn body with `trio.fail_after(timeout)`
- consolidate per-fixture `if` checks into a loop
- add `iscoroutinefunction()` type-check on wrapped fn
- set `__tracebackhide__` at each wrapper level

Also,
- update imports for new subpkg paths:
  `tractor.spawn._spawn`, `tractor.discovery._addr`,
  `tractor.runtime._state`
  (see upcoming, likely large patch commit ;)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi 94458807ce Expose `RuntimeVars` + `get_runtime_vars()` from pkg
Export the new `RuntimeVars` struct and `get_runtime_vars()`
from `tractor.__init__` and improve the accessor to
optionally return the struct form.

Deats,
- add `RuntimeVars` and `get_runtime_vars` to
  `__init__.py` exports; alphabetize `_state` imports.
- move `get_runtime_vars()` up in `_state.py` to sit
  right below `_runtime_vars` dict definition.
- add `as_dict: bool = True` param so callers can get
  either the legacy `dict` or the new `RuntimeVars`
  struct.
- drop the old stub fn at bottom of `_state.py`.
- rm stale `from .msg.pretty_struct import Struct` comment.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi be5e7e446b Proto a `._state.RuntimeVars` struct
So we can start transition from runtime-vars `dict` to a typed struct
for better clarity and wire-ready monitoring potential, as well as
better traceability when .

Deats,
- add a new `RuntimeVars(Struct)` with all fields from `_runtime_vars`
  dict typed out
- include `__setattr__()` with `breakpoint()` for debugging
  any unexpected mutations.
- add `.update()` method for batch-updating compat with `dict`.
- keep old `_runtime_vars: dict` in place (we need to port a ton of
  stuff to adjust..).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi 571b2b320e Add `reg_err_types()` for custom remote exc lookup
Allow external app code to register custom exception types
on `._exceptions` so they can be re-raised on the receiver
side of an IPC dialog via `get_err_type()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
Gud Boi c7b5d00f19 Add `get_runtime_vars()` accessor to `._state`
Expose a copy of the current actor's `_runtime_vars` dict
via a public fn; TODO to convert to `RuntimeVars` struct.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-02 17:59:13 -04:00
dependabot[bot] 1049f7bf38
Bump pygments from 2.19.2 to 2.20.0
Bumps [pygments](https://github.com/pygments/pygments) from 2.19.2 to 2.20.0.
- [Release notes](https://github.com/pygments/pygments/releases)
- [Changelog](https://github.com/pygments/pygments/blob/master/CHANGES)
- [Commits](https://github.com/pygments/pygments/compare/2.19.2...2.20.0)

---
updated-dependencies:
- dependency-name: pygments
  dependency-version: 2.20.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-30 20:28:44 +00:00
Bd cc3bfac741
Merge pull request #366 from goodboy/dereg_on_oserror
Make `find_actor()` delete stale sockaddr entries from registrar on `OSError`
2026-03-25 03:27:27 -04:00
Gud Boi e71eec07de Refine type annots in `_discovery` and `_runtime`
- Add `LocalPortal` union to `query_actor()` return
  type and `reg_portal` var annotation since the
  registrar yields a `LocalPortal` instance.
- Update docstring to note the `LocalPortal` case.
- Widen `.delete_addr()` `addr` param to accept
  `list[str|int]` bc msgpack deserializes tuples as
  lists over IPC.
- Tighten `uid` annotation to `tuple[str, str]|None`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-25 02:16:48 -04:00
Gud Boi b557ec20a7 Coerce IPC `addr` to `tuple` in `.delete_addr()`
`msgpack` deserializes tuples as lists over IPC so
the `bidict.inverse.pop()` needs a `tuple`-cast to
match registry keys.

Regressed-by: 85457cb (`registry_addrs` change)
Found-via: `/run-tests` test_stale_entry_is_deleted

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-25 01:36:58 -04:00
Gud Boi 85457cb839 Address Copilot review suggestions on PR #366
- Use `bidict.forceput()` in `register_actor()` to handle
  duplicate addr values from stale entries or actor restarts.
- Fix `uid` annotation to `tuple[str, str]|None` in
  `maybe_open_portal()` and handle the `None` return from
  `delete_addr()` in log output.
- Pass explicit `registry_addrs=[reg_addr]` to `open_nursery()`
  and `find_actor()` in `test_stale_entry_is_deleted` to ensure
  the test uses the remote registrar.
- Update `query_actor()` docstring to document the new
  `(addr, reg_portal)` yield shape.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 00:21:09 -04:00
Gud Boi 850219f60c Guard `reg_portal` for `None` in `maybe_open_portal()`
Fix potential `AttributeError` when `query_actor()` yields
a `None` portal (peer-found-locally path) and an `OSError`
is raised during transport connect.

Also,
- fix `Arbiter.delete_addr()` return type to
  `tuple[str, str]|None` bc it can return `None`.
- fix "registar" typo -> "registrar" in comment.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 17:26:57 -04:00
Tyler Goodlet d929fb75b5 Rename `.delete_sockaddr()` -> `.delete_addr()` 2026-03-13 21:51:15 -04:00
Tyler Goodlet 403c2174a1 Always no-raise try-to-pop registry addrs 2026-03-13 21:51:15 -04:00
Tyler Goodlet 528012f35f Add stale entry deleted from registrar test
By spawning an actor task that immediately shuts down the transport
server and then sleeps, verify that attempting to connect via the
`._discovery.find_actor()` helper delivers `None` for the `Portal`
value.

Relates to #184 and #216
2026-03-13 21:51:15 -04:00
Tyler Goodlet 0dfa6f4a8a Don't unwrap and unwrapped addr, just warn on delete XD 2026-03-13 21:51:15 -04:00
Tyler Goodlet a0d3741fac Ensure `._registry` values are hashable, since `bidict`! 2026-03-13 21:51:15 -04:00
Tyler Goodlet 149b800c9f Handle stale registrar entries; detect and delete
In cases where an actor's transport server task (by default handling new
TCP connections) terminates early but does not de-register from the
pertaining registry (aka the registrar) actor's address table, the
trying-to-connect client actor will get a connection error on that
address. In the case where client handles a (local) `OSError` (meaning
the target actor address is likely being contacted over `localhost`)
exception, make a further call to the registrar to delete the stale
entry and `yield None` gracefully indicating to calling code that no
`Portal` can be delivered to the target address.

This issue was originally discovered in `piker` where the `emsd`
(clearing engine) actor would sometimes crash on rapid client
re-connects and then leave a `pikerd` stale entry. With this fix new
clients will attempt connect via an endpoint which will re-spawn the
`emsd` when a `None` portal is delivered (via `maybe_spawn_em()`).
2026-03-13 21:51:15 -04:00
Tyler Goodlet 03f458a45c Add `Arbiter.delete_sockaddr()` to remove addrs
Since stale addrs can be leaked where the actor transport server task
crashes but doesn't (successfully) unregister from the registrar, we
need a remote way to remove such entries; hence this new (registrar)
method.

To implement this make use of the `bidict` lib for the `._registry`
table thus making it super simple to do reverse uuid lookups from an
input socket-address.
2026-03-13 21:51:15 -04:00
68 changed files with 2515 additions and 508 deletions

View File

@ -20,7 +20,7 @@ async def sleep(
async def open_ctx( async def open_ctx(
n: tractor._supervise.ActorNursery n: tractor.runtime._supervise.ActorNursery
): ):
# spawn both actors # spawn both actors

View File

@ -10,7 +10,7 @@ async def main(service_name):
await an.start_actor(service_name) await an.start_actor(service_name)
async with tractor.get_registry() as portal: async with tractor.get_registry() as portal:
print(f"Arbiter is listening on {portal.channel}") print(f"Registrar is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr: async with tractor.wait_for_actor(service_name) as sockaddr:
print(f"my_service is found at {sockaddr}") print(f"my_service is found at {sockaddr}")

View File

@ -9,6 +9,8 @@ import os
import signal import signal
import platform import platform
import time import time
from pathlib import Path
from typing import Literal
import pytest import pytest
import tractor import tractor
@ -52,6 +54,76 @@ no_macos = pytest.mark.skipif(
) )
def get_cpu_state(
icpu: int = 0,
setting: Literal[
'scaling_governor',
'*_pstate_max_freq',
'scaling_max_freq',
# 'scaling_cur_freq',
] = '*_pstate_max_freq',
) -> tuple[
Path,
str|int,
]|None:
'''
Attempt to read the (first) CPU's setting according
to the set `setting` from under the file-sys,
/sys/devices/system/cpu/cpu0/cpufreq/{setting}
Useful to determine latency headroom for various perf affected
test suites.
'''
try:
# Read governor for core 0 (usually same for all)
setting_path: Path = list(
Path(f'/sys/devices/system/cpu/cpu{icpu}/cpufreq/')
.glob(f'{setting}')
)[0] # <- XXX must be single match!
with open(
setting_path,
'r',
) as f:
return (
setting_path,
f.read().strip(),
)
except (FileNotFoundError, IndexError):
return None
def cpu_scaling_factor() -> float:
'''
Return a latency-headroom multiplier (>= 1.0) reflecting how
much to inflate time-limits when CPU-freq scaling is active on
linux.
When no scaling info is available (non-linux, missing sysfs),
returns 1.0 (i.e. no headroom adjustment needed).
'''
if _non_linux:
return 1.
mx = get_cpu_state()
cur = get_cpu_state(setting='scaling_max_freq')
if mx is None or cur is None:
return 1.
_mx_pth, max_freq = mx
_cur_pth, cur_freq = cur
cpu_scaled: float = int(cur_freq) / int(max_freq)
if cpu_scaled != 1.:
return 1. / (
cpu_scaled * 2 # <- bc likely "dual threaded"
)
return 1.
def pytest_addoption( def pytest_addoption(
parser: pytest.Parser, parser: pytest.Parser,
): ):

View File

@ -126,7 +126,7 @@ def test_shield_pause(
child.pid, child.pid,
signal.SIGINT, signal.SIGINT,
) )
from tractor._supervise import _shutdown_msg from tractor.runtime._supervise import _shutdown_msg
expect( expect(
child, child,
# 'Shutting down actor runtime', # 'Shutting down actor runtime',

View File

@ -8,17 +8,16 @@ from pathlib import Path
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import ( from tractor import Actor
Actor, from tractor.runtime import _state
_state, from tractor.discovery import _addr
_addr,
)
@pytest.fixture @pytest.fixture
def bindspace_dir_str() -> str: def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir() from tractor.runtime._state import get_rt_dir
rt_dir: Path = get_rt_dir()
bs_dir: Path = rt_dir / 'doggy' bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir) bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir() assert not bs_dir.is_dir()

View File

@ -13,9 +13,9 @@ from tractor import (
Portal, Portal,
ipc, ipc,
msg, msg,
_state,
_addr,
) )
from tractor.runtime import _state
from tractor.discovery import _addr
@tractor.context @tractor.context
async def chk_tpts( async def chk_tpts(

View File

@ -61,7 +61,7 @@ async def maybe_expect_raises(
Async wrapper for ensuring errors propagate from the inner scope. Async wrapper for ensuring errors propagate from the inner scope.
''' '''
if tractor._state.debug_mode(): if tractor.debug_mode():
timeout += 999 timeout += 999
with trio.fail_after(timeout): with trio.fail_after(timeout):

View File

@ -490,7 +490,7 @@ def test_cancel_via_SIGINT(
"""Ensure that a control-C (SIGINT) signal cancels both the parent and """Ensure that a control-C (SIGINT) signal cancels both the parent and
child processes in trionic fashion child processes in trionic fashion
""" """
pid = os.getpid() pid: int = os.getpid()
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
@ -517,6 +517,8 @@ def test_cancel_via_SIGINT_other_task(
started from a seperate ``trio`` child task. started from a seperate ``trio`` child task.
''' '''
from .conftest import cpu_scaling_factor
pid: int = os.getpid() pid: int = os.getpid()
timeout: float = ( timeout: float = (
4 if _non_linux 4 if _non_linux
@ -525,6 +527,11 @@ def test_cancel_via_SIGINT_other_task(
if _friggin_windows: # smh if _friggin_windows: # smh
timeout += 1 timeout += 1
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom
async def spawn_and_sleep_forever( async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):

View File

@ -10,7 +10,20 @@ from tractor._testing import tractor_test
MESSAGE = 'tractoring at full speed' MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None: def test_empty_mngrs_input_raises(
tpt_proto: str,
) -> None:
# TODO, the `open_actor_cluster()` teardown hangs
# intermittently on UDS when `gather_contexts(mngrs=())`
# raises `ValueError` mid-setup; likely a race in the
# actor-nursery cleanup vs UDS socket shutdown. Needs
# a deeper look at `._clustering`/`._supervise` teardown
# paths with the UDS transport.
if tpt_proto == 'uds':
pytest.skip(
'actor-cluster teardown hangs intermittently on UDS'
)
async def main(): async def main():
with trio.fail_after(3): with trio.fail_after(3):
async with ( async with (

View File

@ -26,7 +26,7 @@ from tractor._exceptions import (
StreamOverrun, StreamOverrun,
ContextCancelled, ContextCancelled,
) )
from tractor._state import current_ipc_ctx from tractor.runtime._state import current_ipc_ctx
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
@ -939,7 +939,7 @@ def test_one_end_stream_not_opened(
''' '''
overrunner, buf_size_increase, entrypoint = overrun_by overrunner, buf_size_increase, entrypoint = overrun_by
from tractor._runtime import Actor from tractor.runtime._runtime import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size buf_size = buf_size_increase + Actor.msg_buffer_size
timeout: float = ( timeout: float = (

View File

@ -1,7 +1,7 @@
""" '''
Discovery subsys. Discovery subsystem via a "registrar" actor scenarios.
""" '''
import os import os
import signal import signal
import platform import platform
@ -24,7 +24,7 @@ async def test_reg_then_unreg(
reg_addr: tuple, reg_addr: tuple,
): ):
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_registrar
assert len(actor._registry) == 1 # only self is registered assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery( async with tractor.open_nursery(
@ -35,7 +35,7 @@ async def test_reg_then_unreg(
uid = portal.channel.aid.uid uid = portal.channel.aid.uid
async with tractor.get_registry(reg_addr) as aportal: async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter # this local actor should be the registrar
assert actor is aportal.actor assert actor is aportal.actor
async with tractor.wait_for_actor('actor'): async with tractor.wait_for_actor('actor'):
@ -154,7 +154,7 @@ async def unpack_reg(
actor_or_portal: tractor.Portal|tractor.Actor, actor_or_portal: tractor.Portal|tractor.Actor,
): ):
''' '''
Get and unpack a "registry" RPC request from the "arbiter" registry Get and unpack a "registry" RPC request from the registrar
system. system.
''' '''
@ -163,7 +163,10 @@ async def unpack_reg(
else: else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry') msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()} return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -194,15 +197,15 @@ async def spawn_and_check_registry(
actor = tractor.current_actor() actor = tractor.current_actor()
if remote_arbiter: if remote_arbiter:
assert not actor.is_arbiter assert not actor.is_registrar
if actor.is_arbiter: if actor.is_registrar:
extra = 1 # arbiter is local root actor extra = 1 # registrar is local root actor
get_reg = partial(unpack_reg, actor) get_reg = partial(unpack_reg, actor)
else: else:
get_reg = partial(unpack_reg, portal) get_reg = partial(unpack_reg, portal)
extra = 2 # local root actor + remote arbiter extra = 2 # local root actor + remote registrar
# ensure current actor is registered # ensure current actor is registered
registry: dict = await get_reg() registry: dict = await get_reg()
@ -282,7 +285,7 @@ def test_subactors_unregister_on_cancel(
): ):
''' '''
Verify that cancelling a nursery results in all subactors Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter. deregistering themselves with the registrar.
''' '''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
@ -311,7 +314,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
''' '''
Verify that cancelling a nursery results in all subactors Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local deregistering themselves with a **remote** (not in the local
process tree) arbiter. process tree) registrar.
''' '''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
@ -356,20 +359,24 @@ async def close_chans_before_nursery(
try: try:
get_reg = partial(unpack_reg, aportal) get_reg = partial(unpack_reg, aportal)
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as an:
portal1 = await tn.start_actor( portal1 = await an.start_actor(
name='consumer1', enable_modules=[__name__]) name='consumer1',
portal2 = await tn.start_actor( enable_modules=[__name__],
'consumer2', enable_modules=[__name__]) )
portal2 = await an.start_actor(
'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once async with (
# 3.9+, see https://github.com/goodboy/tractor/issues/207 portal1.open_stream_from(
async with portal1.open_stream_from(
stream_forever stream_forever
) as agen1: ) as agen1,
async with portal2.open_stream_from( portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2,
):
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -380,7 +387,7 @@ async def close_chans_before_nursery(
await streamer(agen2) await streamer(agen2)
finally: finally:
# Kill the root nursery thus resulting in # Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during # normal registrar channel ops to fail during
# teardown. It doesn't seem like this is # teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT. # reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel() # tractor.current_actor()._root_nursery.cancel_scope.cancel()
@ -392,6 +399,7 @@ async def close_chans_before_nursery(
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()
await agen2.aclose() await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(1) await trio.sleep(1)
@ -412,7 +420,7 @@ def test_close_channel_explicit(
''' '''
Verify that closing a stream explicitly and killing the actor's Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the registrar.
''' '''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
@ -427,7 +435,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter( def test_close_channel_explicit_remote_registrar(
daemon: subprocess.Popen, daemon: subprocess.Popen,
start_method: str, start_method: str,
use_signal: bool, use_signal: bool,
@ -436,7 +444,7 @@ def test_close_channel_explicit_remote_arbiter(
''' '''
Verify that closing a stream explicitly and killing the actor's Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the registrar.
''' '''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
@ -448,3 +456,65 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True, remote_arbiter=True,
), ),
) )
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.ipc_server.cancel()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
debug_mode: bool,
daemon: subprocess.Popen,
start_method: str,
reg_addr: tuple,
):
'''
Ensure that when a stale entry is detected in the registrar's
table that the `find_actor()` API takes care of deleting the
stale entry and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal
an: tractor.ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
tractor.get_registry(reg_addr) as _reg_ptl,
):
ptl: tractor.Portal = await an.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(
name,
registry_addrs=[reg_addr],
) as maybe_portal:
# because the transitive
# `._discovery.maybe_open_portal()` call should
# fail and implicitly call `.delete_addr()`
assert maybe_portal is None
registry: dict = await unpack_reg(_reg_ptl)
assert ptl.chan.aid.uid not in registry
# should fail since we knocked out the IPC tpt XD
await ptl.cancel_actor()
await an.cancel()
trio.run(main)

View File

@ -94,8 +94,10 @@ def run_example_in_subproc(
for f in p[2] for f in p[2]
if ( if (
'__' not in f '__' not in f # ignore any pkg-mods
and f[0] != '_' # ignore any `__pycache__` subdir
and '__pycache__' not in str(p[0])
and f[0] != '_' # ignore any WIP "examplel mods"
and 'debugging' not in p[0] and 'debugging' not in p[0]
and 'integration' not in p[0] and 'integration' not in p[0]
and 'advanced_faults' not in p[0] and 'advanced_faults' not in p[0]
@ -143,12 +145,19 @@ def test_example(
'This test does run just fine "in person" however..' 'This test does run just fine "in person" however..'
) )
from .conftest import cpu_scaling_factor
timeout: float = ( timeout: float = (
60 60
if ci_env and _non_linux if ci_env and _non_linux
else 16 else 16
) )
# add latency headroom for CPU freq scaling (auto-cpufreq et al.)
headroom: float = cpu_scaling_factor()
if headroom != 1.:
timeout *= headroom
with open(ex_file, 'r') as ex: with open(ex_file, 'r') as ex:
code = ex.read() code = ex.read()

View File

@ -26,8 +26,8 @@ from tractor import (
to_asyncio, to_asyncio,
RemoteActorError, RemoteActorError,
ContextCancelled, ContextCancelled,
_state,
) )
from tractor.runtime import _state
from tractor.trionics import BroadcastReceiver from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc

View File

@ -201,7 +201,7 @@ async def stream_from_peer(
) -> None: ) -> None:
# sanity # sanity
assert tractor._state.debug_mode() == debug_mode assert tractor.debug_mode() == debug_mode
peer: Portal peer: Portal
try: try:
@ -841,7 +841,7 @@ async def serve_subactors(
async with open_nursery() as an: async with open_nursery() as an:
# sanity # sanity
assert tractor._state.debug_mode() == debug_mode assert tractor.debug_mode() == debug_mode
await ctx.started(peer_name) await ctx.started(peer_name)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
@ -880,7 +880,7 @@ async def client_req_subactor(
) -> None: ) -> None:
# sanity # sanity
if debug_mode: if debug_mode:
assert tractor._state.debug_mode() assert tractor.debug_mode()
# TODO: other cases to do with sub lifetimes: # TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub # -[ ] test that we can have the server spawn a sub

View File

@ -300,19 +300,43 @@ def test_a_quadruple_example(
time_quad_ex: tuple[list[int], float], time_quad_ex: tuple[list[int], float],
ci_env: bool, ci_env: bool,
spawn_backend: str, spawn_backend: str,
test_log: tractor.log.StackLevelAdapter,
): ):
''' '''
This also serves as a kind of "we'd like to be this fast test". This also serves as a "we'd like to be this fast" smoke test
given past empirical eval of this suite.
''' '''
non_linux: bool = (_sys := platform.system()) != 'Linux' non_linux: bool = (_sys := platform.system()) != 'Linux'
results, diff = time_quad_ex this_fast_on_linux: float = 3
assert results
this_fast = ( this_fast = (
6 if non_linux 6 if non_linux
else 3 else this_fast_on_linux
) )
# ^ XXX NOTE,
# i've noticed that tweaking the CPU governor setting
# to not "always" enable "turbo" mode can result in latency
# which causes this limit to be too little. Not sure if it'd
# be worth it to adjust the linux value based on reading the
# CPU conf from the sys?
#
# For ex, see the `auto-cpufreq` docs on such settings,
# https://github.com/AdnanHodzic/auto-cpufreq?tab=readme-ov-file#example-config-file-contents
#
# HENCE this below latency-headroom compensation logic..
from .conftest import cpu_scaling_factor
headroom: float = cpu_scaling_factor()
if headroom != 1.:
this_fast = this_fast_on_linux * headroom
test_log.warning(
f'Adding latency headroom on linux bc CPU scaling,\n'
f'headroom: {headroom}\n'
f'this_fast_on_linux: {this_fast_on_linux} -> {this_fast}\n'
)
results, diff = time_quad_ex
assert results
assert diff < this_fast assert diff < this_fast
@ -353,7 +377,7 @@ def test_not_fast_enough_quad(
assert results is None assert results is None
@tractor_test @tractor_test(timeout=20)
async def test_respawn_consumer_task( async def test_respawn_consumer_task(
reg_addr: tuple, reg_addr: tuple,
spawn_backend: str, spawn_backend: str,

View File

@ -1,5 +1,5 @@
""" """
Arbiter and "local" actor api Registrar and "local" actor api
""" """
import time import time
@ -12,11 +12,11 @@ from tractor._testing import tractor_test
@pytest.mark.trio @pytest.mark.trio
async def test_no_runtime(): async def test_no_runtime():
"""An arbitter must be established before any nurseries """A registrar must be established before any nurseries
can be created. can be created.
(In other words ``tractor.open_root_actor()`` must be engaged at (In other words ``tractor.open_root_actor()`` must be
some point?) engaged at some point?)
""" """
with pytest.raises(RuntimeError) : with pytest.raises(RuntimeError) :
async with tractor.find_actor('doggy'): async with tractor.find_actor('doggy'):
@ -25,9 +25,9 @@ async def test_no_runtime():
@tractor_test @tractor_test
async def test_self_is_registered(reg_addr): async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api." "Verify waiting on the registrar to register itself using the standard api."
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_registrar
with trio.fail_after(0.2): with trio.fail_after(0.2):
async with tractor.wait_for_actor('root') as portal: async with tractor.wait_for_actor('root') as portal:
assert portal.channel.uid[0] == 'root' assert portal.channel.uid[0] == 'root'
@ -35,11 +35,11 @@ async def test_self_is_registered(reg_addr):
@tractor_test @tractor_test
async def test_self_is_registered_localportal(reg_addr): async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal." "Verify waiting on the registrar to register itself using a local portal."
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_registrar
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal) assert isinstance(portal, tractor.runtime._portal.LocalPortal)
with trio.fail_after(0.2): with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns( sockaddr = await portal.run_from_ns(
@ -57,8 +57,8 @@ def test_local_actor_async_func(reg_addr):
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
# arbiter is started in-proc if dne # registrar is started in-proc if dne
assert tractor.current_actor().is_arbiter assert tractor.current_actor().is_registrar
for i in range(10): for i in range(10):
nums.append(i) nums.append(i)

View File

@ -17,11 +17,11 @@ from tractor._testing import (
) )
from tractor import ( from tractor import (
current_actor, current_actor,
_state,
Actor, Actor,
Context, Context,
Portal, Portal,
) )
from tractor.runtime import _state
from .conftest import ( from .conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
@ -30,7 +30,7 @@ from .conftest import (
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor.msg import Aid from tractor.msg import Aid
from tractor._addr import ( from tractor.discovery._addr import (
UnwrappedAddress, UnwrappedAddress,
) )
@ -53,19 +53,19 @@ def test_abort_on_sigint(
@tractor_test @tractor_test
async def test_cancel_remote_arbiter( async def test_cancel_remote_registrar(
daemon: subprocess.Popen, daemon: subprocess.Popen,
reg_addr: UnwrappedAddress, reg_addr: UnwrappedAddress,
): ):
assert not current_actor().is_arbiter assert not current_actor().is_registrar
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
time.sleep(0.1) time.sleep(0.1)
# the arbiter channel server is cancelled but not its main task # the registrar channel server is cancelled but not its main task
assert daemon.returncode is None assert daemon.returncode is None
# no arbiter socket should exist # no registrar socket should exist
with pytest.raises(OSError): with pytest.raises(OSError):
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
pass pass
@ -80,7 +80,7 @@ def test_register_duplicate_name(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
) as an: ) as an:
assert not current_actor().is_arbiter assert not current_actor().is_registrar
p1 = await an.start_actor('doggy') p1 = await an.start_actor('doggy')
p2 = await an.start_actor('doggy') p2 = await an.start_actor('doggy')
@ -122,7 +122,7 @@ async def get_root_portal(
# connect back to our immediate parent which should also # connect back to our immediate parent which should also
# be the actor-tree's root. # be the actor-tree's root.
from tractor._discovery import get_root from tractor.discovery._discovery import get_root
ptl: Portal ptl: Portal
async with get_root() as ptl: async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid root_aid: Aid = ptl.chan.aid

View File

@ -0,0 +1,333 @@
'''
Verify that externally registered remote actor error
types are correctly relayed, boxed, and re-raised across
IPC actor hops via `reg_err_types()`.
Also ensure that when custom error types are NOT registered
the framework indicates the lookup failure to the user.
'''
import pytest
import trio
import tractor
from tractor import (
Context,
Portal,
RemoteActorError,
)
from tractor._exceptions import (
get_err_type,
reg_err_types,
)
# -- custom app-level errors for testing --
class CustomAppError(Exception):
'''
A hypothetical user-app error that should be
boxed+relayed by `tractor` IPC when registered.
'''
class AnotherAppError(Exception):
'''
A second custom error for multi-type registration.
'''
class UnregisteredAppError(Exception):
'''
A custom error that is intentionally NEVER
registered via `reg_err_types()` so we can
verify the framework's failure indication.
'''
# -- remote-task endpoints --
@tractor.context
async def raise_custom_err(
ctx: Context,
) -> None:
'''
Remote ep that raises a `CustomAppError`
after sync-ing with the caller.
'''
await ctx.started()
raise CustomAppError(
'the app exploded remotely'
)
@tractor.context
async def raise_another_err(
ctx: Context,
) -> None:
'''
Remote ep that raises `AnotherAppError`.
'''
await ctx.started()
raise AnotherAppError(
'another app-level kaboom'
)
@tractor.context
async def raise_unreg_err(
ctx: Context,
) -> None:
'''
Remote ep that raises an `UnregisteredAppError`
which has NOT been `reg_err_types()`-registered.
'''
await ctx.started()
raise UnregisteredAppError(
'this error type is unknown to tractor'
)
# -- unit tests for the type-registry plumbing --
class TestRegErrTypesPlumbing:
'''
Low-level checks on `reg_err_types()` and
`get_err_type()` without requiring IPC.
'''
def test_unregistered_type_returns_none(self):
'''
An unregistered custom error name should yield
`None` from `get_err_type()`.
'''
result = get_err_type('CustomAppError')
assert result is None
def test_register_and_lookup(self):
'''
After `reg_err_types()`, the custom type should
be discoverable via `get_err_type()`.
'''
reg_err_types([CustomAppError])
result = get_err_type('CustomAppError')
assert result is CustomAppError
def test_register_multiple_types(self):
'''
Registering a list of types should make each
one individually resolvable.
'''
reg_err_types([
CustomAppError,
AnotherAppError,
])
assert (
get_err_type('CustomAppError')
is CustomAppError
)
assert (
get_err_type('AnotherAppError')
is AnotherAppError
)
def test_builtin_types_always_resolve(self):
'''
Builtin error types like `RuntimeError` and
`ValueError` should always be found without
any prior registration.
'''
assert (
get_err_type('RuntimeError')
is RuntimeError
)
assert (
get_err_type('ValueError')
is ValueError
)
def test_tractor_native_types_resolve(self):
'''
`tractor`-internal exc types (e.g.
`ContextCancelled`) should always resolve.
'''
assert (
get_err_type('ContextCancelled')
is tractor.ContextCancelled
)
def test_boxed_type_str_without_ipc_msg(self):
'''
When a `RemoteActorError` is constructed
without an IPC msg (and no resolvable type),
`.boxed_type_str` should return `'<unknown>'`.
'''
rae = RemoteActorError('test')
assert rae.boxed_type_str == '<unknown>'
# -- IPC-level integration tests --
def test_registered_custom_err_relayed(
debug_mode: bool,
tpt_proto: str,
):
'''
When a custom error type is registered via
`reg_err_types()` on BOTH sides of an IPC dialog,
the parent should receive a `RemoteActorError`
whose `.boxed_type` matches the original custom
error type.
'''
reg_err_types([CustomAppError])
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_transports=[tpt_proto],
) as an:
ptl: Portal = await an.start_actor(
'custom-err-raiser',
enable_modules=[__name__],
)
async with ptl.open_context(
raise_custom_err,
) as (ctx, sent):
assert not sent
try:
await ctx.wait_for_result()
except RemoteActorError as rae:
assert rae.boxed_type is CustomAppError
assert rae.src_type is CustomAppError
assert 'the app exploded remotely' in str(
rae.tb_str
)
raise
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae = excinfo.value
assert rae.boxed_type is CustomAppError
def test_registered_another_err_relayed(
debug_mode: bool,
tpt_proto: str,
):
'''
Same as above but for a different custom error
type to verify multi-type registration works
end-to-end over IPC.
'''
reg_err_types([AnotherAppError])
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_transports=[tpt_proto],
) as an:
ptl: Portal = await an.start_actor(
'another-err-raiser',
enable_modules=[__name__],
)
async with ptl.open_context(
raise_another_err,
) as (ctx, sent):
assert not sent
try:
await ctx.wait_for_result()
except RemoteActorError as rae:
assert (
rae.boxed_type
is AnotherAppError
)
raise
await an.cancel()
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae = excinfo.value
assert rae.boxed_type is AnotherAppError
def test_unregistered_err_still_relayed(
debug_mode: bool,
tpt_proto: str,
):
'''
Verify that even when a custom error type is NOT registered via
`reg_err_types()`, the remote error is still relayed as
a `RemoteActorError` with all string-level info preserved
(traceback, type name, source actor uid).
The `.boxed_type` will be `None` (type obj can't be resolved) but
`.boxed_type_str` and `.src_type_str` still report the original
type name from the IPC msg.
This documents the expected limitation: without `reg_err_types()`
the `.boxed_type` property can NOT resolve to the original Python
type.
'''
# NOTE: intentionally do NOT call
# `reg_err_types([UnregisteredAppError])`
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_transports=[tpt_proto],
) as an:
ptl: Portal = await an.start_actor(
'unreg-err-raiser',
enable_modules=[__name__],
)
async with ptl.open_context(
raise_unreg_err,
) as (ctx, sent):
assert not sent
await ctx.wait_for_result()
await an.cancel()
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae = excinfo.value
# the error IS relayed even without
# registration; type obj is unresolvable but
# all string-level info is preserved.
assert rae.boxed_type is None # NOT `UnregisteredAppError`
assert rae.src_type is None
# string names survive the IPC round-trip
# via the `Error` msg fields.
assert (
rae.src_type_str
==
'UnregisteredAppError'
)
assert (
rae.boxed_type_str
==
'UnregisteredAppError'
)
# original traceback content is preserved
assert 'this error type is unknown' in rae.tb_str
assert 'UnregisteredAppError' in rae.tb_str

View File

@ -94,15 +94,15 @@ def test_runtime_vars_unset(
after the root actor-runtime exits! after the root actor-runtime exits!
''' '''
assert not tractor._state._runtime_vars['_debug_mode'] assert not tractor.runtime._state._runtime_vars['_debug_mode']
async def main(): async def main():
assert not tractor._state._runtime_vars['_debug_mode'] assert not tractor.runtime._state._runtime_vars['_debug_mode']
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
): ):
assert tractor._state._runtime_vars['_debug_mode'] assert tractor.runtime._state._runtime_vars['_debug_mode']
# after runtime closure, should be reverted! # after runtime closure, should be reverted!
assert not tractor._state._runtime_vars['_debug_mode'] assert not tractor.runtime._state._runtime_vars['_debug_mode']
trio.run(main) trio.run(main)

View File

@ -110,7 +110,7 @@ def test_rpc_errors(
) as n: ) as n:
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_registrar
await n.run_in_actor( await n.run_in_actor(
sleep_back_actor, sleep_back_actor,
actor_name=subactor_requests_to, actor_name=subactor_requests_to,

View File

@ -39,7 +39,7 @@ async def spawn(
): ):
# now runtime exists # now runtime exists
actor: tractor.Actor = tractor.current_actor() actor: tractor.Actor = tractor.current_actor()
assert actor.is_arbiter == should_be_root assert actor.is_registrar == should_be_root
# spawns subproc here # spawns subproc here
portal: tractor.Portal = await an.run_in_actor( portal: tractor.Portal = await an.run_in_actor(
@ -68,7 +68,7 @@ async def spawn(
assert result == 10 assert result == 10
return result return result
else: else:
assert actor.is_arbiter == should_be_root assert actor.is_registrar == should_be_root
return 10 return 10
@ -181,7 +181,7 @@ def test_loglevel_propagated_to_subactor(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
name='arbiter', name='registrar',
start_method=start_method, start_method=start_method,
arbiter_addr=reg_addr, arbiter_addr=reg_addr,

View File

@ -30,21 +30,23 @@ from ._streaming import (
MsgStream as MsgStream, MsgStream as MsgStream,
stream as stream, stream as stream,
) )
from ._discovery import ( from .discovery._discovery import (
get_registry as get_registry, get_registry as get_registry,
find_actor as find_actor, find_actor as find_actor,
wait_for_actor as wait_for_actor, wait_for_actor as wait_for_actor,
query_actor as query_actor, query_actor as query_actor,
) )
from ._supervise import ( from .runtime._supervise import (
open_nursery as open_nursery, open_nursery as open_nursery,
ActorNursery as ActorNursery, ActorNursery as ActorNursery,
) )
from ._state import ( from .runtime._state import (
RuntimeVars as RuntimeVars,
current_actor as current_actor, current_actor as current_actor,
is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx, current_ipc_ctx as current_ipc_ctx,
debug_mode as debug_mode debug_mode as debug_mode,
get_runtime_vars as get_runtime_vars,
is_root_process as is_root_process,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled as ContextCancelled, ContextCancelled as ContextCancelled,
@ -65,6 +67,10 @@ from ._root import (
open_root_actor as open_root_actor, open_root_actor as open_root_actor,
) )
from .ipc import Channel as Channel from .ipc import Channel as Channel
from ._portal import Portal as Portal from .runtime._portal import Portal as Portal
from ._runtime import Actor as Actor from .runtime._runtime import Actor as Actor
from .discovery._registry import (
Registrar as Registrar,
Arbiter as Arbiter,
)
# from . import hilevel as hilevel # from . import hilevel as hilevel

View File

@ -22,8 +22,8 @@ import argparse
from ast import literal_eval from ast import literal_eval
from ._runtime import Actor from .runtime._runtime import Actor
from ._entry import _trio_main from .spawn._entry import _trio_main
def parse_uid(arg): def parse_uid(arg):

View File

@ -97,7 +97,7 @@ from ._streaming import (
MsgStream, MsgStream,
open_stream_from_ctx, open_stream_from_ctx,
) )
from ._state import ( from .runtime._state import (
current_actor, current_actor,
debug_mode, debug_mode,
_ctxvar_Context, _ctxvar_Context,
@ -107,8 +107,8 @@ from .trionics import (
) )
# ------ - ------ # ------ - ------
if TYPE_CHECKING: if TYPE_CHECKING:
from ._portal import Portal from .runtime._portal import Portal
from ._runtime import Actor from .runtime._runtime import Actor
from .ipc._transport import MsgTransport from .ipc._transport import MsgTransport
from .devx._frame_stack import ( from .devx._frame_stack import (
CallerInfo, CallerInfo,

View File

@ -43,7 +43,7 @@ from msgspec import (
ValidationError, ValidationError,
) )
from tractor._state import current_actor from tractor.runtime._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
from tractor.msg import ( from tractor.msg import (
Error, Error,
@ -187,7 +187,31 @@ _body_fields: list[str] = list(
) )
def get_err_type(type_name: str) -> BaseException|None: def reg_err_types(
exc_types: list[Type[Exception]],
) -> None:
'''
Register custom exception types for local lookup.
Such that error types can be registered by an external
`tractor`-use-app code base which are expected to be raised
remotely; enables them being re-raised on the receiver side of
some inter-actor IPC dialog.
'''
for exc_type in exc_types:
log.debug(
f'Register custom exception,\n'
f'{exc_type!r}\n'
)
setattr(
_this_mod,
exc_type.__name__,
exc_type,
)
def get_err_type(type_name: str) -> Type[BaseException]|None:
''' '''
Look up an exception type by name from the set of locally known Look up an exception type by name from the set of locally known
namespaces: namespaces:
@ -301,7 +325,8 @@ class RemoteActorError(Exception):
# also pertains to our long long oustanding issue XD # also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5 # https://github.com/goodboy/tractor/issues/5
self._boxed_type: BaseException = boxed_type self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None self._src_type: Type[BaseException]|None = None
self._src_type_resolved: bool = False
self._ipc_msg: Error|None = ipc_msg self._ipc_msg: Error|None = ipc_msg
self._extra_msgdata = extra_msgdata self._extra_msgdata = extra_msgdata
@ -410,24 +435,41 @@ class RemoteActorError(Exception):
return self._ipc_msg.src_type_str return self._ipc_msg.src_type_str
@property @property
def src_type(self) -> str: def src_type(self) -> Type[BaseException]|None:
''' '''
Error type raised by original remote faulting actor. Error type raised by original remote faulting
actor.
When the error has only been relayed a single actor-hop When the error has only been relayed a single
this will be the same as the `.boxed_type`. actor-hop this will be the same as
`.boxed_type`.
If the type can not be resolved locally (i.e.
it was not registered via `reg_err_types()`)
a warning is logged and `None` is returned;
all string-level error info (`.src_type_str`,
`.tb_str`, etc.) remains available.
''' '''
if self._src_type is None: if not self._src_type_resolved:
self._src_type_resolved = True
if self._ipc_msg is None:
return None
self._src_type = get_err_type( self._src_type = get_err_type(
self._ipc_msg.src_type_str self._ipc_msg.src_type_str
) )
if not self._src_type: if not self._src_type:
raise TypeError( log.warning(
f'Failed to lookup src error type with ' f'Failed to lookup src error type via\n'
f'`tractor._exceptions.get_err_type()` :\n' f'`tractor._exceptions.get_err_type()`:\n'
f'{self.src_type_str}' f'\n'
f'`{self._ipc_msg.src_type_str}`'
f' is not registered!\n'
f'\n'
f'Call `reg_err_types()` to enable'
f' full type reconstruction.\n'
) )
return self._src_type return self._src_type
@ -435,20 +477,30 @@ class RemoteActorError(Exception):
@property @property
def boxed_type_str(self) -> str: def boxed_type_str(self) -> str:
''' '''
String-name of the (last hop's) boxed error type. String-name of the (last hop's) boxed error
type.
Falls back to the IPC-msg-encoded type-name
str when the type can not be resolved locally
(e.g. unregistered custom errors).
''' '''
# TODO, maybe support also serializing the # TODO, maybe support also serializing the
# `ExceptionGroup.exeptions: list[BaseException]` set under # `ExceptionGroup.exceptions: list[BaseException]`
# certain conditions? # set under certain conditions?
bt: Type[BaseException] = self.boxed_type bt: Type[BaseException] = self.boxed_type
if bt: if bt:
return str(bt.__name__) return str(bt.__name__)
return '' # fallback to the str name from the IPC msg
# when the type obj can't be resolved.
if self._ipc_msg:
return self._ipc_msg.boxed_type_str
return '<unknown>'
@property @property
def boxed_type(self) -> Type[BaseException]: def boxed_type(self) -> Type[BaseException]|None:
''' '''
Error type boxed by last actor IPC hop. Error type boxed by last actor IPC hop.
@ -677,10 +729,22 @@ class RemoteActorError(Exception):
failing actor's remote env. failing actor's remote env.
''' '''
# TODO: better tb insertion and all the fancier dunder # TODO: better tb insertion and all the fancier
# metadata stuff as per `.__context__` etc. and friends: # dunder metadata stuff as per `.__context__`
# etc. and friends:
# https://github.com/python-trio/trio/issues/611 # https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type src_type_ref: Type[BaseException]|None = (
self.src_type
)
if src_type_ref is None:
# unresolvable type: fall back to
# a `RuntimeError` preserving original
# traceback + type name.
return RuntimeError(
f'{self.src_type_str}: '
f'{self.tb_str}'
)
return src_type_ref(self.tb_str) return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given # TODO: local recontruction of nested inception for a given
@ -1209,14 +1273,31 @@ def unpack_error(
if not isinstance(msg, Error): if not isinstance(msg, Error):
return None return None
# try to lookup a suitable error type from the local runtime # try to lookup a suitable error type from the
# env then use it to construct a local instance. # local runtime env then use it to construct a
# boxed_type_str: str = error_dict['boxed_type_str'] # local instance.
boxed_type_str: str = msg.boxed_type_str boxed_type_str: str = msg.boxed_type_str
boxed_type: Type[BaseException] = get_err_type(boxed_type_str) boxed_type: Type[BaseException]|None = get_err_type(
boxed_type_str
)
# retrieve the error's msg-encoded remotoe-env info if boxed_type is None:
message: str = f'remote task raised a {msg.boxed_type_str!r}\n' log.warning(
f'Failed to resolve remote error type\n'
f'`{boxed_type_str}` - boxing as\n'
f'`RemoteActorError` with original\n'
f'traceback preserved.\n'
f'\n'
f'Call `reg_err_types()` to enable\n'
f'full type reconstruction.\n'
)
# retrieve the error's msg-encoded remote-env
# info
message: str = (
f'remote task raised a '
f'{msg.boxed_type_str!r}\n'
)
# TODO: do we even really need these checks for RAEs? # TODO: do we even really need these checks for RAEs?
if boxed_type_str in [ if boxed_type_str in [

View File

@ -37,19 +37,20 @@ import warnings
import trio import trio
from . import _runtime from .runtime import _runtime
from .discovery._registry import Registrar
from .devx import ( from .devx import (
debug, debug,
_frame_stack, _frame_stack,
pformat as _pformat, pformat as _pformat,
) )
from . import _spawn from .spawn import _spawn
from . import _state from .runtime import _state
from . import log from . import log
from .ipc import ( from .ipc import (
_connect_chan, _connect_chan,
) )
from ._addr import ( from .discovery._addr import (
Address, Address,
UnwrappedAddress, UnwrappedAddress,
default_lo_addrs, default_lo_addrs,
@ -267,7 +268,6 @@ async def open_root_actor(
if start_method is not None: if start_method is not None:
_spawn.try_set_start_method(start_method) _spawn.try_set_start_method(start_method)
# TODO! remove this ASAP!
if arbiter_addr is not None: if arbiter_addr is not None:
warnings.warn( warnings.warn(
'`arbiter_addr` is now deprecated\n' '`arbiter_addr` is now deprecated\n'
@ -400,7 +400,7 @@ async def open_root_actor(
'registry socket(s) already bound' 'registry socket(s) already bound'
) )
# we were able to connect to an arbiter # we were able to connect to a registrar
logger.info( logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}' f'Registry(s) seem(s) to exist @ {ponged_addrs}'
) )
@ -453,8 +453,7 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348 # https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
# TODO: rename as `RootActor` or is that even necessary? actor = Registrar(
actor = _runtime.Arbiter(
name=name or 'registrar', name=name or 'registrar',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,

View File

@ -55,7 +55,7 @@ from tractor.msg import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from .runtime._runtime import Actor
from ._context import Context from ._context import Context
from .ipc import Channel from .ipc import Channel

View File

@ -26,9 +26,7 @@ import random
from typing import ( from typing import (
Type, Type,
) )
from tractor import ( from tractor.discovery import _addr
_addr,
)
def get_rando_addr( def get_rando_addr(

View File

@ -21,17 +21,27 @@ and applications.
''' '''
from functools import ( from functools import (
partial, partial,
wraps,
) )
import inspect import inspect
import platform import platform
from typing import (
Callable,
Type,
)
import pytest import pytest
import tractor import tractor
import trio import trio
import wrapt
def tractor_test(fn): def tractor_test(
wrapped: Callable|None = None,
*,
# @tractor_test(<deco-params>)
timeout:float = 30,
hide_tb: bool = True,
):
''' '''
Decorator for async test fns to decorator-wrap them as "native" Decorator for async test fns to decorator-wrap them as "native"
looking sync funcs runnable by `pytest` and auto invoked with looking sync funcs runnable by `pytest` and auto invoked with
@ -45,8 +55,18 @@ def tractor_test(fn):
Basic deco use: Basic deco use:
--------------- ---------------
@tractor_test @tractor_test(
async def test_whatever(): timeout=10,
)
async def test_whatever(
# fixture param declarations
loglevel: str,
start_method: str,
reg_addr: tuple,
tpt_proto: str,
debug_mode: bool,
):
# already inside a root-actor runtime `trio.Task`
await ... await ...
@ -55,7 +75,7 @@ def tractor_test(fn):
If any of the following fixture are requested by the wrapped test If any of the following fixture are requested by the wrapped test
fn (via normal func-args declaration), fn (via normal func-args declaration),
- `reg_addr` (a socket addr tuple where arbiter is listening) - `reg_addr` (a socket addr tuple where registrar is listening)
- `loglevel` (logging level passed to tractor internals) - `loglevel` (logging level passed to tractor internals)
- `start_method` (subprocess spawning backend) - `start_method` (subprocess spawning backend)
@ -67,52 +87,69 @@ def tractor_test(fn):
`tractor.open_root_actor()` funcargs. `tractor.open_root_actor()` funcargs.
''' '''
@wraps(fn) __tracebackhide__: bool = hide_tb
# handle the decorator not called with () case.
# i.e. in `wrapt` support a deco-with-optional-args,
# https://wrapt.readthedocs.io/en/master/decorators.html#decorators-with-optional-arguments
if wrapped is None:
return wrapt.PartialCallableObjectProxy(
tractor_test,
timeout=timeout,
hide_tb=hide_tb
)
@wrapt.decorator
def wrapper( def wrapper(
wrapped: Callable,
instance: object|Type|None,
args: tuple,
kwargs: dict,
):
__tracebackhide__: bool = hide_tb
# NOTE, ensure we inject any test-fn declared fixture names.
for kw in [
'reg_addr',
'loglevel',
'start_method',
'debug_mode',
'tpt_proto',
'timeout',
]:
if kw in inspect.signature(wrapped).parameters:
assert kw in kwargs
start_method = kwargs.get('start_method')
if platform.system() == "Windows":
if start_method is None:
kwargs['start_method'] = 'trio'
elif start_method != 'trio':
raise ValueError(
'ONLY the `start_method="trio"` is supported on Windows.'
)
# open a root-actor, passing certain
# `tractor`-runtime-settings, then invoke the test-fn body as
# the root-most task.
#
# https://wrapt.readthedocs.io/en/master/decorators.html#processing-function-arguments
async def _main(
*args, *args,
loglevel=None,
reg_addr=None, # runtime-settings
loglevel:str|None = None,
reg_addr:tuple|None = None,
start_method: str|None = None, start_method: str|None = None,
debug_mode: bool = False, debug_mode: bool = False,
tpt_proto: str|None=None, tpt_proto: str|None = None,
**kwargs
**kwargs,
): ):
# __tracebackhide__ = True __tracebackhide__: bool = hide_tb
# NOTE: inject ant test func declared fixture with trio.fail_after(timeout):
# names by manually checking!
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['reg_addr'] = reg_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if 'debug_mode' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode
if 'tpt_proto' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['tpt_proto'] = tpt_proto
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor( async with tractor.open_root_actor(
# **kwargs,
registry_addrs=[reg_addr] if reg_addr else None, registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel, loglevel=loglevel,
start_method=start_method, start_method=start_method,
@ -121,17 +158,31 @@ def tractor_test(fn):
debug_mode=debug_mode, debug_mode=debug_mode,
): ):
await fn(*args, **kwargs) # invoke test-fn body IN THIS task
await wrapped(
*args,
**kwargs,
)
main = _main funcname = wrapped.__name__
if not inspect.iscoroutinefunction(wrapped):
raise TypeError(
f"Test-fn {funcname!r} must be an async-function !!"
)
else: # invoke runtime via a root task.
# use implicit root actor start return trio.run(
main = partial(fn, *args, **kwargs) partial(
_main,
*args,
**kwargs,
)
)
return trio.run(main)
return wrapper return wrapper(
wrapped,
)
def pytest_addoption( def pytest_addoption(
@ -179,7 +230,8 @@ def pytest_addoption(
def pytest_configure(config): def pytest_configure(config):
backend = config.option.spawn_backend backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend) from tractor.spawn._spawn import try_set_start_method
try_set_start_method(backend)
# register custom marks to avoid warnings see, # register custom marks to avoid warnings see,
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers # https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
@ -225,7 +277,8 @@ def tpt_protos(request) -> list[str]:
# XXX ensure we support the protocol by name via lookup! # XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys: for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key] from tractor.discovery import _addr
addr_type = _addr._address_types[proto_key]
assert addr_type.proto_key == proto_key assert addr_type.proto_key == proto_key
yield proto_keys yield proto_keys
@ -256,7 +309,7 @@ def tpt_proto(
# f'tpt-proto={proto_key!r}\n' # f'tpt-proto={proto_key!r}\n'
# ) # )
from tractor import _state from tractor.runtime import _state
if _state._def_tpt_proto != proto_key: if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key _state._def_tpt_proto = proto_key
_state._runtime_vars['_enable_tpts'] = [ _state._runtime_vars['_enable_tpts'] = [

View File

@ -45,17 +45,15 @@ from typing import (
) )
import trio import trio
from tractor import ( from tractor.runtime import _state
_state, from tractor import log as logmod
log as logmod,
)
from tractor.devx import debug from tractor.devx import debug
log = logmod.get_logger() log = logmod.get_logger()
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor._spawn import ProcessType from tractor.spawn._spawn import ProcessType
from tractor import ( from tractor import (
Actor, Actor,
ActorNursery, ActorNursery,

View File

@ -53,8 +53,8 @@ import trio
from tractor._exceptions import ( from tractor._exceptions import (
NoRuntime, NoRuntime,
) )
from tractor import _state from tractor.runtime import _state
from tractor._state import ( from tractor.runtime._state import (
current_actor, current_actor,
debug_mode, debug_mode,
) )
@ -76,7 +76,7 @@ from ._repl import (
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
from tractor._runtime import ( from tractor.runtime._runtime import (
Actor, Actor,
) )

View File

@ -25,7 +25,7 @@ from functools import (
import os import os
import pdbp import pdbp
from tractor._state import ( from tractor.runtime._state import (
is_root_process, is_root_process,
) )

View File

@ -27,7 +27,7 @@ from typing import (
) )
import trio import trio
from tractor.log import get_logger from tractor.log import get_logger
from tractor._state import ( from tractor.runtime._state import (
current_actor, current_actor,
is_root_process, is_root_process,
) )
@ -44,7 +44,7 @@ if TYPE_CHECKING:
from tractor.ipc import ( from tractor.ipc import (
Channel, Channel,
) )
from tractor._runtime import ( from tractor.runtime._runtime import (
Actor, Actor,
) )

View File

@ -40,7 +40,7 @@ from trio.lowlevel import (
Task, Task,
) )
from tractor._context import Context from tractor._context import Context
from tractor._state import ( from tractor.runtime._state import (
current_actor, current_actor,
debug_mode, debug_mode,
is_root_process, is_root_process,

View File

@ -55,12 +55,12 @@ import tractor
from tractor.log import get_logger from tractor.log import get_logger
from tractor.to_asyncio import run_trio_task_in_future from tractor.to_asyncio import run_trio_task_in_future
from tractor._context import Context from tractor._context import Context
from tractor import _state from tractor.runtime import _state
from tractor._exceptions import ( from tractor._exceptions import (
NoRuntime, NoRuntime,
InternalError, InternalError,
) )
from tractor._state import ( from tractor.runtime._state import (
current_actor, current_actor,
current_ipc_ctx, current_ipc_ctx,
is_root_process, is_root_process,
@ -87,7 +87,7 @@ from ..pformat import (
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
from threading import Thread from threading import Thread
from tractor._runtime import ( from tractor.runtime._runtime import (
Actor, Actor,
) )
# from ._post_mortem import BoxedMaybeException # from ._post_mortem import BoxedMaybeException

View File

@ -55,12 +55,12 @@ import tractor
from tractor.to_asyncio import run_trio_task_in_future from tractor.to_asyncio import run_trio_task_in_future
from tractor.log import get_logger from tractor.log import get_logger
from tractor._context import Context from tractor._context import Context
from tractor import _state from tractor.runtime import _state
from tractor._exceptions import ( from tractor._exceptions import (
DebugRequestError, DebugRequestError,
InternalError, InternalError,
) )
from tractor._state import ( from tractor.runtime._state import (
current_actor, current_actor,
is_root_process, is_root_process,
) )
@ -71,7 +71,7 @@ if TYPE_CHECKING:
from tractor.ipc import ( from tractor.ipc import (
IPCServer, IPCServer,
) )
from tractor._runtime import ( from tractor.runtime._runtime import (
Actor, Actor,
) )
from ._repl import ( from ._repl import (
@ -1013,7 +1013,7 @@ async def request_root_stdio_lock(
DebugStatus.req_task = current_task() DebugStatus.req_task = current_task()
req_err: BaseException|None = None req_err: BaseException|None = None
try: try:
from tractor._discovery import get_root from tractor.discovery._discovery import get_root
# NOTE: we need this to ensure that this task exits # NOTE: we need this to ensure that this task exits
# BEFORE the REPl instance raises an error like # BEFORE the REPl instance raises an error like
# `bdb.BdbQuit` directly, OW you get a trio cs stack # `bdb.BdbQuit` directly, OW you get a trio cs stack

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Discovery (protocols) API for automatic addressing
and location management of (service) actors.
NOTE: to avoid circular imports, this ``__init__``
does NOT eagerly import submodules. Use direct
module paths like ``tractor.discovery._addr`` or
``tractor.discovery._discovery`` instead.
'''

View File

@ -27,15 +27,15 @@ from trio import (
SocketListener, SocketListener,
) )
from .log import get_logger from ..log import get_logger
from ._state import ( from ..runtime._state import (
_def_tpt_proto, _def_tpt_proto,
) )
from .ipc._tcp import TCPAddress from ..ipc._tcp import TCPAddress
from .ipc._uds import UDSAddress from ..ipc._uds import UDSAddress
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ..runtime._runtime import Actor
log = get_logger() log = get_logger()

View File

@ -28,29 +28,29 @@ from typing import (
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from tractor.log import get_logger from tractor.log import get_logger
from .trionics import ( from ..trionics import (
gather_contexts, gather_contexts,
collapse_eg, collapse_eg,
) )
from .ipc import _connect_chan, Channel from ..ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
Address, Address,
wrap_address wrap_address
) )
from ._portal import ( from ..runtime._portal import (
Portal, Portal,
open_portal, open_portal,
LocalPortal, LocalPortal,
) )
from ._state import ( from ..runtime._state import (
current_actor, current_actor,
_runtime_vars, _runtime_vars,
_def_tpt_proto, _def_tpt_proto,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ..runtime._runtime import Actor
log = get_logger() log = get_logger()
@ -60,7 +60,7 @@ log = get_logger()
async def get_registry( async def get_registry(
addr: UnwrappedAddress|None = None, addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
Portal | LocalPortal | None, Portal|LocalPortal|None,
None, None,
]: ]:
''' '''
@ -72,8 +72,8 @@ async def get_registry(
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
if actor.is_registrar: if actor.is_registrar:
# we're already the arbiter # we're already the registrar
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the registrar actor)
yield LocalPortal( yield LocalPortal(
actor, actor,
Channel(transport=None) Channel(transport=None)
@ -153,21 +153,27 @@ async def query_actor(
regaddr: UnwrappedAddress|None = None, regaddr: UnwrappedAddress|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
UnwrappedAddress|None, tuple[UnwrappedAddress|None, Portal|LocalPortal|None],
None, None,
]: ]:
''' '''
Lookup a transport address (by actor name) via querying a registrar Lookup a transport address (by actor name) via querying a registrar
listening @ `regaddr`. listening @ `regaddr`.
Returns the transport protocol (socket) address or `None` if no Yields a `tuple` of `(addr, reg_portal)` where,
entry under that name exists. - `addr` is the transport protocol (socket) address or `None` if
no entry under that name exists,
- `reg_portal` is the `Portal` (or `LocalPortal` when the
current actor is the registrar) used for the lookup (or
`None` when the peer was found locally via
`get_peer_by_name()`).
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
if ( if (
name == 'registrar' name == 'registrar'
and actor.is_registrar and
actor.is_registrar
): ):
raise RuntimeError( raise RuntimeError(
'The current actor IS the registry!?' 'The current actor IS the registry!?'
@ -175,10 +181,10 @@ async def query_actor(
maybe_peers: list[Channel]|None = get_peer_by_name(name) maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers: if maybe_peers:
yield maybe_peers[0].raddr yield maybe_peers[0].raddr, None
return return
reg_portal: Portal reg_portal: Portal|LocalPortal
regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0] regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0]
async with get_registry(regaddr) as reg_portal: async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now # TODO: return portals to all available actors - for now
@ -188,8 +194,7 @@ async def query_actor(
'find_actor', 'find_actor',
name=name, name=name,
) )
yield addr yield addr, reg_portal
@acm @acm
async def maybe_open_portal( async def maybe_open_portal(
@ -204,14 +209,48 @@ async def maybe_open_portal(
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,
) as addr: ) as (addr, reg_portal):
pass if not addr:
yield None
return
if addr: try:
async with _connect_chan(addr) as chan: async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry
# from the registrar actor when available.
if reg_portal is not None:
uid: tuple[str, str]|None = await reg_portal.run_from_ns(
'self',
'delete_addr',
addr=addr,
)
if uid:
log.warning(
f'Deleted stale registry entry !\n'
f'addr: {addr!r}\n'
f'uid: {uid!r}\n'
)
else: else:
log.warning(
f'No registry entry found for addr: {addr!r}'
)
else:
log.warning(
f'Connection to {addr!r} failed'
f' and no registry portal available'
f' to delete stale entry.'
)
yield None yield None
@ -229,10 +268,10 @@ async def find_actor(
None, None,
]: ]:
''' '''
Ask the arbiter to find actor(s) by name. Ask the registrar to find actor(s) by name.
Returns a connected portal to the last registered matching actor Returns a connected portal to the last registered
known to the arbiter. matching actor known to the registrar.
''' '''
# optimization path, use any pre-existing peer channel # optimization path, use any pre-existing peer channel
@ -280,7 +319,7 @@ async def find_actor(
if not any(portals): if not any(portals):
if raise_on_none: if raise_on_none:
raise RuntimeError( raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}' f'No actor {name!r} found registered @ {registry_addrs!r}'
) )
yield None yield None
return return

View File

@ -0,0 +1,253 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later
# version.
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU Affero General Public License for more
# details.
# You should have received a copy of the GNU Affero General
# Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
'''
Actor-registry for process-tree service discovery.
The `Registrar` is a special `Actor` subtype that serves as
the process-tree's name-registry, tracking actor
name-to-address mappings so peers can discover each other.
'''
from __future__ import annotations
from bidict import bidict
import trio
from ..runtime._runtime import Actor
from ._addr import (
UnwrappedAddress,
Address,
wrap_address,
)
from ..devx import debug
from ..log import get_logger
log = get_logger('tractor')
class Registrar(Actor):
'''
A special registrar `Actor` who can contact all other
actors within its immediate process tree and keeps
a registry of others meant to be discoverable in
a distributed application.
Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor
(process) nursery.
By default, the registrar is always initialized when
and if no other registrar socket addrs have been
specified to runtime init entry-points (such as
`open_root_actor()` or `open_nursery()`). Any time
a new main process is launched (and thus a new root
actor created) and, no existing registrar can be
contacted at the provided `registry_addr`, then
a new one is always created; however, if one can be
reached it is used.
Normally a distributed app requires at least one
registrar per logical host where for that given
"host space" (aka localhost IPC domain of addresses)
it is responsible for making all other host (local
address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_registrar = True
def is_registry(self) -> bool:
return self.is_registrar
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an
# actor uid (which is filled in once the actor
# has sucessfully registered), or that uid
# after registry is complete.
list[trio.Event|tuple[str, str]]
] = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> UnwrappedAddress|None:
for uid, addr in self._registry.items():
if name in uid:
return addr
return None
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
'''
Return current name registry.
This method is async to allow for cross-actor
invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the
# msgpack unpacker since we have tuples as keys
# (note this makes the registrar suscetible to
# hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
name: str,
) -> list[UnwrappedAddress]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the
provided name is currently registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
mailbox_info: str = (
'Actor registry contact infos:\n'
)
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_addr: {addr}\n\n'
)
if name == uid[0]:
addrs.append(addr)
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(
name, []
).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
addrs.append(
self._registry[uid]
)
log.runtime(mailbox_info)
return addrs
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
) -> None:
uid = name, hash = (
str(uid[0]),
str(uid[1]),
)
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await debug.pause()
# XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique;
# use `.forceput()` to replace any prior (stale)
# entries that might map a different uid to the same
# addr (e.g. after an unclean shutdown or
# actor-restart reusing the same address).
self._registry.forceput(uid, tuple(addr))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
self._waiters.setdefault(
name, []
).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
async def unregister_actor(
self,
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(
uid, None
)
if entry is None:
log.warning(
f'Request to de-register'
f' {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str]|list[str|int],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str]|None = (
self._registry.inverse.pop(
tuple(addr),
None,
)
)
if uid:
report: str = (
'Deleting registry-entry for,\n'
)
else:
report: str = (
'No registry entry for,\n'
)
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
# Backward compat alias
Arbiter = Registrar

View File

@ -146,7 +146,7 @@ _pubtask2lock: dict[str, trio.StrictFIFOLock] = {}
def pub( def pub(
wrapped: typing.Callable | None = None, wrapped: typing.Callable|None = None,
*, *,
tasks: set[str] = set(), tasks: set[str] = set(),
): ):
@ -244,8 +244,12 @@ def pub(
task2lock[name] = trio.StrictFIFOLock() task2lock[name] = trio.StrictFIFOLock()
@wrapt.decorator @wrapt.decorator
async def wrapper(agen, instance, args, kwargs): async def wrapper(
agen,
instance,
args,
kwargs,
):
# XXX: this is used to extract arguments properly as per the # XXX: this is used to extract arguments properly as per the
# `wrapt` docs # `wrapt` docs
async def _execute( async def _execute(

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -0,0 +1,592 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
else:
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -39,7 +39,7 @@ from ._types import (
transport_from_addr, transport_from_addr,
transport_from_stream, transport_from_stream,
) )
from tractor._addr import ( from tractor.discovery._addr import (
is_wrapped_addr, is_wrapped_addr,
wrap_address, wrap_address,
Address, Address,

View File

@ -50,26 +50,24 @@ from ..devx.pformat import (
from .._exceptions import ( from .._exceptions import (
TransportClosed, TransportClosed,
) )
from .. import _rpc from ..runtime import _rpc
from ..msg import ( from ..msg import (
MsgType, MsgType,
Struct, Struct,
types as msgtypes, types as msgtypes,
) )
from ..trionics import maybe_open_nursery from ..trionics import maybe_open_nursery
from .. import ( from ..runtime import _state
_state, from .. import log
log, from ..discovery._addr import Address
)
from .._addr import Address
from ._chan import Channel from ._chan import Channel
from ._transport import MsgTransport from ._transport import MsgTransport
from ._uds import UDSAddress from ._uds import UDSAddress
from ._tcp import TCPAddress from ._tcp import TCPAddress
if TYPE_CHECKING: if TYPE_CHECKING:
from .._runtime import Actor from ..runtime._runtime import Actor
from .._supervise import ActorNursery from ..runtime._supervise import ActorNursery
log = log.get_logger() log = log.get_logger()
@ -357,7 +355,7 @@ async def handle_stream_from_peer(
# and `MsgpackStream._inter_packets()` on a read from the # and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up # stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for # inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be # a bound listener on the registrar addr. the reset will be
# because the handshake was never meant took place. # because the handshake was never meant took place.
log.runtime( log.runtime(
con_status con_status
@ -970,7 +968,7 @@ class Server(Struct):
in `accept_addrs`. in `accept_addrs`.
''' '''
from .._addr import ( from ..discovery._addr import (
default_lo_addrs, default_lo_addrs,
wrap_address, wrap_address,
) )

View File

@ -54,7 +54,7 @@ from tractor.msg import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor._addr import Address from tractor.discovery._addr import Address
log = get_logger() log = get_logger()
@ -225,7 +225,7 @@ class MsgpackTransport(MsgTransport):
# not sure entirely why we need this but without it we # not sure entirely why we need this but without it we
# seem to be getting racy failures here on # seem to be getting racy failures here on
# arbiter/registry name subs.. # registrar name subs..
trio.BrokenResourceError, trio.BrokenResourceError,
) as trans_err: ) as trans_err:

View File

@ -53,14 +53,14 @@ from tractor.log import get_logger
from tractor.ipc._transport import ( from tractor.ipc._transport import (
MsgpackTransport, MsgpackTransport,
) )
from tractor._state import ( from tractor.runtime._state import (
get_rt_dir, get_rt_dir,
current_actor, current_actor,
is_root_process, is_root_process,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from tractor.runtime._runtime import Actor
# Platform-specific credential passing constants # Platform-specific credential passing constants

View File

@ -47,7 +47,7 @@ import colorlog # type: ignore
# import colored_traceback.auto # ?TODO, need better config? # import colored_traceback.auto # ?TODO, need better config?
import trio import trio
from ._state import current_actor from .runtime._state import current_actor
_default_loglevel: str = 'ERROR' _default_loglevel: str = 'ERROR'

View File

@ -50,7 +50,7 @@ from tractor._exceptions import (
_mk_recv_mte, _mk_recv_mte,
pack_error, pack_error,
) )
from tractor._state import ( from tractor.runtime._state import (
current_ipc_ctx, current_ipc_ctx,
) )
from ._codec import ( from ._codec import (

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The actor runtime: core machinery for the
actor-model implemented on a `trio` task runtime.
NOTE: to avoid circular imports, this ``__init__``
does NOT eagerly import submodules. Use direct
module paths like ``tractor.runtime._state`` or
``tractor.runtime._runtime`` instead.
'''

View File

@ -39,30 +39,30 @@ import warnings
import trio import trio
from .trionics import ( from ..trionics import (
maybe_open_nursery, maybe_open_nursery,
collapse_eg, collapse_eg,
) )
from ._state import ( from ._state import (
current_actor, current_actor,
) )
from .ipc import Channel from ..ipc import Channel
from .log import get_logger from ..log import get_logger
from .msg import ( from ..msg import (
# Error, # Error,
PayloadMsg, PayloadMsg,
NamespacePath, NamespacePath,
Return, Return,
) )
from ._exceptions import ( from .._exceptions import (
NoResult, NoResult,
TransportClosed, TransportClosed,
) )
from ._context import ( from .._context import (
Context, Context,
open_context_from_portal, open_context_from_portal,
) )
from ._streaming import ( from .._streaming import (
MsgStream, MsgStream,
) )

View File

@ -43,11 +43,11 @@ from trio import (
TaskStatus, TaskStatus,
) )
from .ipc import Channel from ..ipc import Channel
from ._context import ( from .._context import (
Context, Context,
) )
from ._exceptions import ( from .._exceptions import (
ContextCancelled, ContextCancelled,
RemoteActorError, RemoteActorError,
ModuleNotExposed, ModuleNotExposed,
@ -56,19 +56,19 @@ from ._exceptions import (
pack_error, pack_error,
unpack_error, unpack_error,
) )
from .trionics import ( from ..trionics import (
collapse_eg, collapse_eg,
is_multi_cancelled, is_multi_cancelled,
maybe_raise_from_masking_exc, maybe_raise_from_masking_exc,
) )
from .devx import ( from ..devx import (
debug, debug,
add_div, add_div,
pformat as _pformat, pformat as _pformat,
) )
from . import _state from . import _state
from .log import get_logger from ..log import get_logger
from .msg import ( from ..msg import (
current_codec, current_codec,
MsgCodec, MsgCodec,
PayloadT, PayloadT,

View File

@ -83,46 +83,46 @@ from tractor.msg import (
pretty_struct, pretty_struct,
types as msgtypes, types as msgtypes,
) )
from .trionics import ( from ..trionics import (
collapse_eg, collapse_eg,
maybe_open_nursery, maybe_open_nursery,
) )
from .ipc import ( from ..ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm.. # IPCServer, # causes cycles atm..
_server, _server,
) )
from ._addr import ( from ..discovery._addr import (
UnwrappedAddress, UnwrappedAddress,
Address, Address,
# default_lo_addrs, # default_lo_addrs,
get_address_cls, get_address_cls,
wrap_address, wrap_address,
) )
from ._context import ( from .._context import (
mk_context, mk_context,
Context, Context,
) )
from .log import get_logger from ..log import get_logger
from ._exceptions import ( from .._exceptions import (
ContextCancelled, ContextCancelled,
InternalError, InternalError,
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
) )
from .devx import ( from ..devx import (
debug, debug,
pformat as _pformat pformat as _pformat
) )
from ._discovery import get_registry from ..discovery._discovery import get_registry
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
from . import _mp_fixup_main from ..spawn import _mp_fixup_main
from . import _rpc from . import _rpc
if TYPE_CHECKING: if TYPE_CHECKING:
from ._supervise import ActorNursery from ._supervise import ActorNursery # noqa
from trio._channel import MemoryChannelState from trio._channel import MemoryChannelState
@ -175,13 +175,21 @@ class Actor:
dialog. dialog.
''' '''
# ugh, we need to get rid of this and replace with a "registry" sys is_registrar: bool = False
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
@property @property
def is_registrar(self) -> bool: def is_arbiter(self) -> bool:
return self.is_arbiter '''
Deprecated, use `.is_registrar`.
'''
warnings.warn(
'`Actor.is_arbiter` is deprecated.\n'
'Use `.is_registrar` instead.',
DeprecationWarning,
stacklevel=2,
)
return self.is_registrar
@property @property
def is_root(self) -> bool: def is_root(self) -> bool:
@ -237,7 +245,6 @@ class Actor:
registry_addrs: list[Address]|None = None, registry_addrs: list[Address]|None = None,
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove!
arbiter_addr: UnwrappedAddress|None = None, arbiter_addr: UnwrappedAddress|None = None,
) -> None: ) -> None:
@ -287,8 +294,8 @@ class Actor:
] ]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started
# by the user (currently called the "arbiter") # manually by the user (the "registrar")
self._spawn_method: str = spawn_method self._spawn_method: str = spawn_method
# RPC state # RPC state
@ -907,7 +914,7 @@ class Actor:
# TODO! -[ ] another `Struct` for rtvs.. # TODO! -[ ] another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']: if rvs['_debug_mode']:
from .devx import ( from ..devx import (
enable_stack_on_sig, enable_stack_on_sig,
maybe_init_greenback, maybe_init_greenback,
) )
@ -1656,7 +1663,7 @@ async def async_main(
# TODO, just read direct from ipc_server? # TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# Register with the arbiter if we're told its addr # Register with the registrar if we're told its addr
log.runtime( log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
# ^-TODO-^ we should instead show the maddr here^^ # ^-TODO-^ we should instead show the maddr here^^
@ -1880,153 +1887,8 @@ async def async_main(
log.runtime(teardown_report) log.runtime(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`! # Backward compat: class moved to discovery._registry
class Arbiter(Actor): from ..discovery._registry import (
''' Registrar as Registrar,
A special registrar (and for now..) `Actor` who can contact all )
other actors within its immediate process tree and possibly keeps Arbiter = Registrar
a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor (process)
nursery.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
init entry-points (such as `open_root_actor()` or
`open_nursery()`). Any time a new main process is launched (and
thus thus a new root actor created) and, no existing registrar
can be contacted at the provided `registry_addr`, then a new
one is always created; however, if one can be reached it is
used.
Normally a distributed app requires at least registrar per
logical host where for that given "host space" (aka localhost
IPC domain of addresses) it is responsible for making all other
host (local address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: dict[
tuple[str, str],
UnwrappedAddress,
] = {}
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
# is filled in once the actor has sucessfully registered),
# or that uid after registry is complete.
list[trio.Event | tuple[str, str]]
] = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> UnwrappedAddress|None:
for uid, addr in self._registry.items():
if name in uid:
return addr
return None
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
'''
Return current name registry.
This method is async to allow for cross-actor invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
name: str,
) -> list[UnwrappedAddress]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_addr: {addr}\n\n'
)
if name == uid[0]:
addrs.append(addr)
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
addrs.append(self._registry[uid])
log.runtime(mailbox_info)
return addrs
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
) -> None:
uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await debug.pause()
self._registry[uid] = addr
# pop and signal all waiter events
events = self._waiters.pop(name, [])
self._waiters.setdefault(name, []).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
async def unregister_actor(
self,
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')

View File

@ -25,6 +25,7 @@ from contextvars import (
from pathlib import Path from pathlib import Path
from typing import ( from typing import (
Any, Any,
Callable,
Literal, Literal,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -32,9 +33,14 @@ from typing import (
import platformdirs import platformdirs
from trio.lowlevel import current_task from trio.lowlevel import current_task
from msgspec import (
field,
Struct,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
from ._context import Context from .._context import Context
# default IPC transport protocol settings # default IPC transport protocol settings
@ -47,9 +53,70 @@ _def_tpt_proto: TransportProtocolKey = 'tcp'
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None _last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`! # TODO: mk this a `msgspec.Struct`!
# -[ ] type out all fields obvi! # -[x] type out all fields obvi!
# -[ ] (eventually) mk wire-ready for monitoring? # -[ ] (eventually) mk wire-ready for monitoring?
class RuntimeVars(Struct):
'''
Actor-(and thus process)-global runtime state.
This struct is relayed from parent to child during sub-actor
spawning and is a singleton instance per process.
Generally contains,
- root-actor indicator.
- comms-info: addrs for both (public) process/service-discovery
and in-tree contact with other actors.
- transport-layer IPC protocol server(s) settings.
- debug-mode settings for enabling sync breakpointing and any
surrounding REPL-fixture hooking.
- infected-`asyncio` via guest-mode toggle(s)/cohfig.
'''
_is_root: bool = False # bool
_root_mailbox: tuple[str, str|int] = (None, None) # tuple[str|None, str|None]
_root_addrs: list[
tuple[str, str|int],
] = [] # tuple[str|None, str|None]
# parent->chld ipc protocol caps
_enable_tpts: list[TransportProtocolKey] = field(
default_factory=lambda: [_def_tpt_proto],
)
# registrar info
_registry_addrs: list[tuple] = []
# `debug_mode: bool` settings
_debug_mode: bool = False # bool
repl_fixture: bool|Callable = False # |AbstractContextManager[bool]
# for `tractor.pause_from_sync()` & `breakpoint()` support
use_greenback: bool = False
# infected-`asyncio`-mode: `trio` running as guest.
_is_infected_aio: bool = False
def __setattr__(
self,
key,
val,
) -> None:
breakpoint()
super().__setattr__(key, val)
def update(
self,
from_dict: dict|Struct,
) -> None:
for attr, val in from_dict.items():
setattr(
self,
attr,
val,
)
_runtime_vars: dict[str, Any] = { _runtime_vars: dict[str, Any] = {
# root of actor-process tree info # root of actor-process tree info
'_is_root': False, # bool '_is_root': False, # bool
@ -73,6 +140,23 @@ _runtime_vars: dict[str, Any] = {
} }
def get_runtime_vars(
as_dict: bool = True,
) -> dict:
'''
Deliver a **copy** of the current `Actor`'s "runtime variables".
By default, for historical impl reasons, this delivers the `dict`
form, but the `RuntimeVars` struct should be utilized as possible
for future calls.
'''
if as_dict:
return dict(_runtime_vars)
return RuntimeVars(**_runtime_vars)
def last_actor() -> Actor|None: def last_actor() -> Actor|None:
''' '''
Try to return last active `Actor` singleton Try to return last active `Actor` singleton
@ -98,7 +182,7 @@ def current_actor(
_current_actor is None _current_actor is None
): ):
msg: str = 'No local actor has been initialized yet?\n' msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime from .._exceptions import NoRuntime
if last := last_actor(): if last := last_actor():
msg += ( msg += (
@ -164,7 +248,7 @@ def current_ipc_ctx(
not ctx not ctx
and error_on_not_set and error_on_not_set
): ):
from ._exceptions import InternalError from .._exceptions import InternalError
raise InternalError( raise InternalError(
'No IPC context has been allocated for this task yet?\n' 'No IPC context has been allocated for this task yet?\n'
f'|_{current_task()}\n' f'|_{current_task()}\n'

View File

@ -30,36 +30,36 @@ import warnings
import trio import trio
from .devx import ( from ..devx import (
debug, debug,
pformat as _pformat, pformat as _pformat,
) )
from ._addr import ( from ..discovery._addr import (
UnwrappedAddress, UnwrappedAddress,
mk_uuid, mk_uuid,
) )
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from ..log import get_logger, get_loglevel
from ._runtime import Actor from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from .trionics import ( from ..trionics import (
is_multi_cancelled, is_multi_cancelled,
collapse_eg, collapse_eg,
) )
from ._exceptions import ( from .._exceptions import (
ContextCancelled, ContextCancelled,
) )
from ._root import ( from .._root import (
open_root_actor, open_root_actor,
) )
from . import _state from . import _state
from . import _spawn from ..spawn import _spawn
if TYPE_CHECKING: if TYPE_CHECKING:
import multiprocessing as mp import multiprocessing as mp
# from .ipc._server import IPCServer # from ..ipc._server import IPCServer
from .ipc import IPCServer from ..ipc import IPCServer
log = get_logger() log = get_logger()

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Actor process spawning machinery using
multiple backends (trio, multiprocessing).
NOTE: to avoid circular imports, this ``__init__``
does NOT eagerly import submodules. Use direct
module paths like ``tractor.spawn._spawn`` or
``tractor.spawn._entry`` instead.
'''

View File

@ -29,19 +29,19 @@ from typing import (
import trio # type: ignore import trio # type: ignore
from .log import ( from ..log import (
get_console_log, get_console_log,
get_logger, get_logger,
) )
from . import _state from ..runtime import _state
from .devx import ( from ..devx import (
_frame_stack, _frame_stack,
pformat, pformat,
) )
# from .msg import pretty_struct # from ..msg import pretty_struct
from .to_asyncio import run_as_asyncio_guest from ..to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress from ..discovery._addr import UnwrappedAddress
from ._runtime import ( from ..runtime._runtime import (
async_main, async_main,
Actor, Actor,
) )

View File

@ -125,7 +125,7 @@ class PatchedForkServer(ForkServer):
self._forkserver_pid = None self._forkserver_pid = None
# XXX only thing that changed! # XXX only thing that changed!
cmd = ('from tractor._forkserver_override import main; ' + cmd = ('from tractor.spawn._forkserver_override import main; ' +
'main(%d, %d, %r, **%r)') 'main(%d, %d, %r, **%r)')
if self._preload_modules: if self._preload_modules:

View File

@ -34,11 +34,11 @@ from typing import (
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from .devx import ( from ..devx import (
debug, debug,
pformat as _pformat pformat as _pformat
) )
from tractor._state import ( from tractor.runtime._state import (
current_actor, current_actor,
is_main_process, is_main_process,
is_root_process, is_root_process,
@ -46,10 +46,10 @@ from tractor._state import (
_runtime_vars, _runtime_vars,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import UnwrappedAddress from tractor.discovery._addr import UnwrappedAddress
from tractor._portal import Portal from tractor.runtime._portal import Portal
from tractor._runtime import Actor from tractor.runtime._runtime import Actor
from tractor._entry import _mp_main from ._entry import _mp_main
from tractor._exceptions import ActorFailure from tractor._exceptions import ActorFailure
from tractor.msg import ( from tractor.msg import (
types as msgtypes, types as msgtypes,
@ -58,11 +58,11 @@ from tractor.msg import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ipc import ( from tractor.ipc import (
_server, _server,
Channel, Channel,
) )
from ._supervise import ActorNursery from tractor.runtime._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)

View File

@ -43,7 +43,7 @@ from tractor._exceptions import (
AsyncioTaskExited, AsyncioTaskExited,
AsyncioCancelled, AsyncioCancelled,
) )
from tractor._state import ( from tractor.runtime._state import (
debug_mode, debug_mode,
_runtime_vars, _runtime_vars,
) )

View File

@ -21,7 +21,6 @@ Sugary patterns for trio + tractor designs.
from ._mngrs import ( from ._mngrs import (
gather_contexts as gather_contexts, gather_contexts as gather_contexts,
maybe_open_context as maybe_open_context, maybe_open_context as maybe_open_context,
maybe_open_nursery as maybe_open_nursery,
) )
from ._broadcast import ( from ._broadcast import (
AsyncReceiver as AsyncReceiver, AsyncReceiver as AsyncReceiver,
@ -37,3 +36,6 @@ from ._beg import (
from ._taskc import ( from ._taskc import (
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
) )
from ._tn import (
maybe_open_nursery as maybe_open_nursery,
)

View File

@ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html
from __future__ import annotations from __future__ import annotations
from abc import abstractmethod from abc import abstractmethod
from collections import deque from collections import deque
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from operator import ne from operator import ne
from typing import ( from typing import (
@ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel):
return await self._receive_from_underlying(key, state) return await self._receive_from_underlying(key, state)
@asynccontextmanager @acm
async def subscribe( async def subscribe(
self, self,
raise_on_lag: bool = True, raise_on_lag: bool = True,

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
import inspect import inspect
from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
@ -33,21 +32,18 @@ from typing import (
Hashable, Hashable,
Sequence, Sequence,
TypeVar, TypeVar,
TYPE_CHECKING,
) )
import trio import trio
from tractor._state import current_actor from tractor.runtime._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
from ._tn import maybe_open_nursery
# from ._beg import collapse_eg # from ._beg import collapse_eg
# from ._taskc import ( # from ._taskc import (
# maybe_raise_from_masking_exc, # maybe_raise_from_masking_exc,
# ) # )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger() log = get_logger()
@ -55,30 +51,6 @@ log = get_logger()
T = TypeVar("T") T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as nursery:
if lib == trio:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
unwrapped: dict[int, T], unwrapped: dict[int, T],

View File

@ -0,0 +1,341 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Erlang-style (ish) "one-cancels-one" nursery, what we just call
a "task manager".
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from functools import partial
from typing import (
Generator,
Any,
)
from outcome import (
Outcome,
acapture,
)
from msgspec import Struct
import trio
from trio import (
TaskStatus,
CancelScope,
Nursery,
)
from trio.lowlevel import (
Task,
)
from tractor.log import get_logger
log = get_logger(__name__)
class TaskOutcome(Struct):
'''
The outcome of a scheduled ``trio`` task which includes an interface
for synchronizing to the completion of the task's runtime and access
to the eventual boxed result/value or raised exception.
'''
lowlevel_task: Task
_exited = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value
@property
def result(self) -> Any:
'''
Either Any or None depending on whether the Outcome has compeleted.
'''
if self._outcome is None:
raise RuntimeError(
f'Task {self.lowlevel_task.name} is not complete.\n'
'First wait on `await TaskOutcome.wait_for_result()`!'
)
return self._result
def _set_outcome(
self,
outcome: Outcome,
):
'''
Set the ``Outcome`` for this task.
This method should only ever be called by the task's supervising
nursery implemenation.
'''
self._outcome = outcome
self._result = outcome.unwrap()
self._exited.set()
async def wait_for_result(self) -> Any:
'''
Unwind the underlying task's ``Outcome`` by async waiting for
the task to first complete and then unwrap it's result-value.
'''
if self._exited.is_set():
return self._result
await self._exited.wait()
out = self._outcome
if out is None:
raise ValueError(f'{out} is not an outcome!?')
return self.result
class TaskManagerNursery(Struct):
_tn: Nursery
_scopes: dict[
Task,
tuple[CancelScope, Outcome]
] = {}
task_manager: Generator[Any, Outcome, None] | None = None
async def start_soon(
self,
async_fn,
*args,
name=None,
task_manager: Generator[Any, Outcome, None] | None = None
) -> tuple[CancelScope, Task]:
# NOTE: internals of a nursery don't let you know what
# the most recently spawned task is by order.. so we'd
# have to either change that or do set ops.
# pre_start_tasks: set[Task] = n._children.copy()
# new_tasks = n._children - pre_start_Tasks
# assert len(new_tasks) == 1
# task = new_tasks.pop()
tn: Nursery = self._tn
sm = self.task_manager
# we do default behavior of a scope-per-nursery
# if the user did not provide a task manager.
if sm is None:
return tn.start_soon(async_fn, *args, name=None)
# new_task: Task|None = None
to_return: tuple[Any] | None = None
# NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here?
mngr = sm(nursery=tn)
async def _start_wrapped_in_scope(
task_status: TaskStatus[
tuple[CancelScope, Task]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this was working before?! and, do we need something
# like it to implement `.start()`?
# nonlocal to_return
# execute up to the first yield
try:
to_return: tuple[Any] = next(mngr)
except StopIteration:
raise RuntimeError("task manager didn't yield") from None
# TODO: how do we support `.start()` style?
# - relay through whatever the
# started task passes back via `.started()` ?
# seems like that won't work with also returning
# a "task handle"?
# - we were previously binding-out this `to_return` to
# the parent's lexical scope, why isn't that working
# now?
task_status.started(to_return)
# invoke underlying func now that cs is entered.
outcome = await acapture(async_fn, *args)
# execute from the 1st yield to return and expect
# generator-mngr `@task_scope_manager` thinger to
# terminate!
try:
mngr.send(outcome)
# I would presume it's better to have a handle to
# the `Outcome` entirely? This method sends *into*
# the mngr this `Outcome.value`; seems like kinda
# weird semantics for our purposes?
# outcome.send(mngr)
except StopIteration:
return
else:
raise RuntimeError(f"{mngr} didn't stop!")
to_return = await tn.start(_start_wrapped_in_scope)
assert to_return is not None
# TODO: use the fancy type-check-time type signature stuff from
# mypy i guess..to like, relay the type of whatever the
# generator yielded through? betcha that'll be un-grokable XD
return to_return
# TODO: define a decorator to runtime type check that this a generator
# with a single yield that also delivers a value (of some std type) from
# the yield expression?
# @trio.task_manager
def add_task_handle_and_crash_handling(
nursery: Nursery,
debug_mode: bool = False,
) -> Generator[
Any,
Outcome,
None,
]:
'''
A customizable, user defined "task scope manager".
With this specially crafted single-yield generator function you can
add more granular controls around every task spawned by `trio` B)
'''
# if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task()
log.info(f'Spawning task: {task.name}')
# User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
task_outcome = TaskOutcome(task)
# NOTE: if wanted the user could wrap the output task handle however
# they want!
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome
# this yields back when the task is terminated, cancelled or returns.
try:
with CancelScope() as cs:
# the yielded value(s) here are what are returned to the
# nursery's `.start_soon()` caller B)
lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome)
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err:
if debug_mode:
log.exception(
f'{task.name} crashed, entering debugger!'
)
import pdbp
pdbp.xpm()
raise err
finally:
log.info(
f'Task exitted\n'
f')>\n'
f' |_{task}\n'
# ^^TODO? use sclang formatter?
# -[ ] .devx.pformat.nest_from_op()` yo!
)
@acm
async def open_taskman(
task_manager: Generator[Any, Outcome, None] | None = None,
**lowlevel_nursery_kwargs,
):
async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse:
yield TaskManagerNursery(
nurse,
task_manager=task_manager,
)
async def sleep_then_return_val(val: str):
await trio.sleep(0.2)
return val
async def ensure_cancelled():
try:
await trio.sleep_forever()
except trio.Cancelled:
task = trio.lowlevel.current_task()
log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)')
assert 0
except BaseException:
raise RuntimeError("woa woa woa this ain't right!")
if __name__ == '__main__':
from tractor.log import get_console_log
get_console_log(level='info')
async def main():
async with open_taskman(
task_manager=partial(
add_task_handle_and_crash_handling,
debug_mode=True,
),
) as tm:
for _ in range(3):
outcome, _ = await tm.start_soon(trio.sleep_forever)
# extra task we want to engage in debugger post mortem.
err_outcome, cs = await tm.start_soon(ensure_cancelled)
val: str = 'yoyoyo'
val_outcome, _ = await tm.start_soon(
sleep_then_return_val,
val,
)
res = await val_outcome.wait_for_result()
assert res == val
log.info(f'{res} -> GOT EXPECTED TASK VALUE')
await trio.sleep(0.6)
log.cancel(
f'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)
cs.cancel()
trio.run(main)

View File

@ -0,0 +1,94 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`trio.Nursery` wrappers which we short-hand refer to as
`tn`: "task nursery".
(whereas we refer to `tractor.ActorNursery` as the short-hand `an`)
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from typing import (
Any,
AsyncGenerator,
TYPE_CHECKING,
)
import trio
from tractor.log import get_logger
# from ._beg import (
# collapse_eg,
# )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
# ??TODO? is this even a good idea??
# it's an extra LoC to stack `collapse_eg()` vs.
# a new/foreign/bad-std-named very thing wrapper..?
# -[ ] is there a better/simpler name?
# @acm
# async def open_loose_tn() -> trio.Nursery:
# '''
# Implements the equivalent of the old style loose eg raising
# task-nursery from `trio<=0.25.0` ,
# .. code-block:: python
# async with trio.open_nursery(
# strict_exception_groups=False,
# ) as tn:
# ...
# '''
# async with (
# collapse_eg(),
# trio.open_nursery() as tn,
# ):
# yield tn
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
loose: bool = False,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as tn:
tn.cancel_scope.shield = shield
yield tn

View File

@ -273,11 +273,11 @@ wheels = [
[[package]] [[package]]
name = "pygments" name = "pygments"
version = "2.19.2" version = "2.20.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" },
] ]
[[package]] [[package]]