Compare commits

...

56 Commits

Author SHA1 Message Date
Tyler Goodlet 850726ba5f Bump "task-manager(-nursery)" naming, add logging
Namely just renaming any `trio.Nursery` instances to `tn`, the primary
`@acm`-API to `.trionics.open_taskman()` and change out all `print()`s
for logger instances with 'info' level enabled by the mod-script usage.
2026-02-19 19:21:43 -05:00
Tyler Goodlet 37a36ec0b4 Add a new `.trionics._tn` for "task nursery stuff"
Since I'd like to decouple the new "task-manager-nursery" lowlevel
primitives/abstractions from the higher-level
`TaskManagerNursery`-supporting API(s) and default per-task
supervision-strat and because `._mngr` is already purposed for
higher-level "on-top-of-nursery" patterns as it is.

Deats,
- move `maybe_open_nursery()` into the new mod.
- adjust the pkg-mod's import to the new sub-mod.
- also draft up this idea for an API which stacks `._beg.collapse_eg()`
  onto a nursery with the WIP name `open_loose_tn()` but more then
  likely i'll just discard this idea bc i think the explicit `@acm`
  stacking is more explicit/pythonic/up-front-grokable despite the extra
  LoC.
2026-02-19 19:21:43 -05:00
Tyler Goodlet da79f1fe5c Add `debug_mode: bool` control to task mngr
Allows dynamically importing `pdbp` when enabled and a way for
eventually linking with `tractor`'s own debug mode flag.
2026-02-19 19:21:43 -05:00
Tyler Goodlet 47e27e1c09 Go all in on "task manager" naming 2026-02-19 19:21:43 -05:00
Tyler Goodlet efbd5a0727 More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
2026-02-19 19:21:43 -05:00
Tyler Goodlet 3322cad8bf Ensure user-allocated cancel scope just works!
Turns out the nursery doesn't have to care about allocating a per task
`CancelScope` since the user can just do that in the
`@task_scope_manager` if desired B) So just mask all the nursery cs
allocating with the intention of removal.

Also add a test for per-task-cancellation by starting the crash task as
a `trio.sleep_forever()` but then cancel it via the user allocated cs
and ensure the crash propagates as expected 💥
2026-02-19 19:21:43 -05:00
Tyler Goodlet 64c1c2e532 Facepalm, don't pass in unecessary cancel scope 2026-02-19 19:21:43 -05:00
Tyler Goodlet 76c57420bb Do renaming, implement lowlevel `Outcome` sending
As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
2026-02-19 19:21:43 -05:00
Tyler Goodlet 2b1776b1d7 Alias to `@acm` in broadcaster mod 2026-02-19 19:21:43 -05:00
Tyler Goodlet 6022e3dead Initial prototype for a one-cancels-one style supervisor, nursery thing.. 2026-02-19 19:21:43 -05:00
Tyler Goodlet b5e5d3a3ac Use shorthand nursery var-names per convention in codebase 2026-02-19 19:21:43 -05:00
Tyler Goodlet 98a4ede1cf Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2026-02-19 19:21:43 -05:00
Tyler Goodlet ebbccf893a Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2026-02-19 19:21:43 -05:00
Tyler Goodlet 19329b886d Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2026-02-19 19:21:43 -05:00
Bd 70bb77280e
Merge pull request #411 from goodboy/tpt_tolerance
Tpt-tolerance: more lowlevel `trio` CRE/BRE -> `TransportClosed` translations
2026-02-19 16:40:17 -05:00
Gud Boi 916f88a070 Less newlines in `._rpc` log msg 2026-02-19 16:31:54 -05:00
Gud Boi 91f2f3ec10 Use test-harness `loglevel` in inter-peer suite 2026-02-19 16:29:20 -05:00
Tyler Goodlet 3e5124e184 Hide `._rpc._invoke()` frame, again.. 2026-02-19 16:28:22 -05:00
Gud Boi fa86269e30 Stuff from auto-review in https://github.com/goodboy/tractor/pull/412 .. 2026-02-19 16:20:21 -05:00
Gud Boi d0b92bbeba Clean up `._transport` error-case comment
Expand and clarify the comment for the default `case _`
block in the `.send()` error matcher, noting that we
console-error and raise-thru for unexpected disconnect
conditions.

(this patch was suggested by copilot in,
 https://github.com/goodboy/tractor/pull/411)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 16:18:39 -05:00
Gud Boi 9470815f5a Fix `spawn` fixture cleanup + test assertions
Improve the `spawn` fixture teardown logic in
`tests/devx/conftest.py` fixing the while-else bug, and fix
`test_advanced_faults` genexp for `TransportClosed` exc type
checking.

Deats,
- replace broken `while-else` pattern with direct
  `if ptyproc.isalive()` check after the SIGINT loop.
- fix undefined `spawned` ref -> `ptyproc.isalive()` in
  while condition.
- improve walrus expr formatting in timeout check (multiline
  style).

Also fix `test_ipc_channel_break_during_stream()` assertion,
- wrap genexp in `all()` call so it actually checks all excs
  are `TransportClosed` instead of just creating an unused
  generator.

(this patch was suggested by copilot in,
 https://github.com/goodboy/tractor/pull/411)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 16:14:11 -05:00
Gud Boi 592d918394 Tweak `test_inter_peer_cancellation` for races
Adjust `basic_echo_server()` default sequence len to avoid the race
where the 'tell_little_bro()` finished streaming **before** the
echo-server sub is cancelled by its peer subactor (which is the whole
thing we're testing!).

Deats,
- bump `rng_seed` default from 50 -> 100 to ensure peer
  cancel req arrives before echo dialog completes on fast hw.
- add `trio.sleep(0.001)` between send/receive in msg loop on the
  "client" streamer side to give cancel request transit more time to
  arrive.

Also,
- add more native `tractor`-type hints.
- reflow `basic_echo_server()` doc-string for 67 char limit
- add masked `pause()` call with comment about unreachable
  code path
- alphabetize imports: mv `current_actor` and `open_nursery`
  below typed imports

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 15:24:42 -05:00
Gud Boi 0cddc67bdb Add doc-strs to `get_root()` + `maybe_open_portal()`
Brief descriptions for both fns in `._discovery` clarifying
what each delivers and under what conditions.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 052fe2435f Improve `Channel` doc-strs + minor cleanups
Flesh out missing method doc-strings, improve log msg formatting and
assert -> `RuntimeError` for un-inited tpt layer.

Deats,
- add doc-string to `.send()` noting `TransportClosed` raise
  on comms failures.
- add doc-string to `.recv()`.
- expand `._aiter_msgs()` doc-string, line-len reflow.
- add doc-string to `.connected()`.
- convert `assert self._transport` -> `RuntimeError` raise
  in `._aiter_msgs()` for more explicit crashing.
- expand `_connect_chan()` doc-string, note it's lowlevel
  and suggest `.open_portal()` to user instead.
- factor out `src_exc_str` in `TransportClosed` log handler
  to avoid double-call
- use multiline style for `.connected()` return expr.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 28819bf5d3 Add `Actor.is_root()` convenience predicate meth 2026-02-19 13:55:02 -05:00
Gud Boi 07c2ba5c0d Drop `trio`-exc-catching if tpt-closed covers them
Remove the `trio.ClosedResourceError` and `trio.BrokenResourceError`
handling that should now be subsumed by `TransportClosed` re-raising out
of the `.ipc` stack.

Deats,
- drop CRE and BRE from `._streaming.MsgStream.aclose()/.send()` blocks.
- similarly rm from `._context.open_context_from_portal()`.
- also from `._portal.Portal.cancel_actor()` and drop the
  (now-completed-todo) comment about this exact thing.

Also add comment in `._rpc.try_ship_error_to_remote()` noting the
remaining `trio` catches there are bc the `.ipc` layers *should* be
wrapping them; thus `log.critical()` use is warranted.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 50f40f427b Include `TransportClosed` in tpt-layer err handling
Add `TransportClosed` to except clauses where `trio`'s own
resource-closed errors are already caught, ensuring our
higher-level tpt exc is also tolerated in those same spots.
Likely i will follow up with a removal of the `trio` variants since most
*should be* caught and re-raised as tpt-closed out of the `.ipc` stack
now?

Add `TransportClosed` to various handler blocks,
- `._streaming.MsgStream.aclose()/.send()` except blocks.
- the broken-channel except in `._context.open_context_from_portal()`.
- obvi import it where necessary in those ^ mods.

Adjust `test_advanced_faults` suite + exs-script to match,
- update `ipc_failure_during_stream.py` example to catch
  `TransportClosed` alongside `trio.ClosedResourceError`
  in both the break and send-check paths.
- shield the `trio.sleep(0.01)` after tpt close in example to avoid
  taskc-raise/masking on that checkpoint since we want to simulate
  waiting for a user to send a KBI.
- loosen `ExceptionGroup` assertion to `len(excs) <= 2` and ensure all
  excs are `TransportClosed`.
- improve multi-line formatting, minor style/formatting fixes in
  condition expressions.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi bf6de55865 Improve tpt-closed msg-fmt/content and CRE case matching
Refine tpt-error reporting to include closure attribution (`'locally'`
vs `'by peer'`), tighten match conditions and reduce needless newlines
in exc reprs.

Deats,
- factor out `trans_err_msg: str` and `by_whom: str` into a `dict`
  lookup before the `match:` block to pair specific err msgs to closure
  attribution strings.
- use `by_whom` directly as `CRE` case guard condition
  (truthy when msg matches known underlying CRE msg content).
- conveniently include `by_whom!r` in `TransportClosed` message.
- fix `'locally ?'` -> `'locally?'` in send-side `CRE`
  handler (drop errant space).
- add masked `maybe_pause_bp()` calls at both `CRE` sites (from when
  i was tracing a test harness issue where the UDS socket path wasn't
  being cleaned up on teardown).
- drop trailing `\n` from `body=` args to `TransportClosed`.
- reuse `trans_err_msg` for the `BRE`/broken-pipe guard.

Also adjust testing, namely `test_ctxep_pauses_n_maybe_ipc_breaks`'s
expected patts-set for new msg formats to be raised out of
`.ipc._transport`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 5ded99a886 Add a `._trace.maybe_pause_bp()` for tpt-broken cases
Internal helper which falls back to sync `pdb` when the
child actor can't reach root to acquire the TTY lock.

Useful when debugging tpt layer failures (intentional or
otherwise) where a sub-actor can no longer IPC-contact the
root to coordinate REPL access; root uses `.pause()` as
normal while non-root falls back to `mk_pdb().set_trace()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 7145fa364f Add `SIGINT` cleanup to `spawn` fixture in `devx/conftest`
Convert `spawn` fixture to a generator and add post-test graceful
subproc cleanup via `SIGINT`/`SIGKILL` to avoid leaving stale `pexpect`
child procs around between test runs as well as any UDS-tpt socket files
under the system runtime-dir.

Deats,
- convert `return _spawn` -> `yield _spawn` to enable
  post-yield teardown logic.
- add a new `nonlocal spawned` ref so teardown logic can access the last
  spawned child from outside the delivered spawner fn-closure.
- add `SIGINT`-loop after yield with 5s timeout, then
  `SIGKILL` if proc still alive.
- add masked `breakpoint()` and TODO about UDS path cleanup

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi f8e25688c7 Unmask `ClosedResourceError` handling in `._transport`
Unmask the CRE case block for peer-closed socket errors which already
had a TODO about reproducing the condition. It appears this case can
happen during inter-actor comms teardowns in `piker`, but i haven't been
able to figure out exactly what reproduces it yet..

So activate the block again for that 'socket already closed'-msg case,
and add a TODO questioning how to reproduce it.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-12 00:51:50 -05:00
Tyler Goodlet c3f455a8ec Mask tpt-closed handling of `chan.send(return_msg)`
A partial revert of commit c05d08e426 since it seem we already
suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given
that i'm pretty sure this new handler code should basically never run?

Left in a todo to remove the masked content once i'm done more
thoroughly testing under `piker`.
2026-02-12 00:51:50 -05:00
Tyler Goodlet f78e842fba More `TransportClosed`-handling around IPC-IO
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
2026-02-12 00:51:50 -05:00
Bd 3638b80c9d
Merge pull request #412 from goodboy/root_actor_raddrs_fix
Non-registrar, root actor `_root_addrs` runtime-vars fix
2026-02-12 00:49:40 -05:00
Gud Boi 2ed9e65530 Clear rtvs state on root shutdown..
Fixes the bug discovered in last test update, not sure how this wasn't
caught already XD
2026-02-11 22:17:26 -05:00
Gud Boi 6cab363c51 Catch-n-fail on stale `_root_addrs` state..
Turns out we aren't clearing the `._state._runtime_vars` entries in
between `open_root_actor` calls.. This test refinement catches that by
adding runtime-vars asserts on the expected root-addrs value; ensure
`_runtime_vars['_root_addrs'] ONLY match the values provided by the
test's CURRENT root actor.

This causes a failure when the (just added)
`test_non_registrar_spawns_child` is run as part of the module suite,
it's fine when run standalone.
2026-02-11 22:17:26 -05:00
Gud Boi 8aee24e83f Fix when root-actor addrs is set as rtvs
Move `_root_addrs` assignment to after `async_main()` unblocks (via
`.started()`) which now delivers the bind addrs , ensuring correct
`UnwrappedAddress` propagation into `._state._runtime_vars` for
non-registar root actors..

Previously for non-registrar root actors the `._state._runtime_vars`
entries were being set as `Address` values which ofc IPC serialize
incorrectly rn vs. the unwrapped versions, (well until we add a msgspec
for their structs anyway) and thus are passed in incorrect form to
children/subactors during spawning..

This fixes the issue by waiting for the `.ipc.*` stack to
bind-and-resolve any randomly allocated addrs (by the OS) until after
the initial `Actor` startup is complete.

Deats,
- primarily, mv `_root_addrs` assignment from before `root_tn.start()`
  to after, using started(-ed) `accept_addrs` now delivered from
  `._runtime.async_main()`..
- update `task_status` type hints to match.
- unpack and set the `(accept_addrs, reg_addrs)` tuple from
  `root_tn.start()` call into `._state._runtime_vars` entries.
- improve and embolden comments distinguishing registrar vs non-registrar
  init paths, ensure typing reflects wrapped vs. unwrapped addrs.

Also,
- add a masked `mk_pdb().set_trace()` for debugging `raddrs` values
  being "off".
- add TODO about using UDS on linux for root mailbox
- rename `trans_bind_addrs` -> `tpt_bind_addrs` for clarity.
- expand comment about random port allocation for
  non-registrar case

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:17:26 -05:00
Gud Boi cdcc1b42fc Add test for non-registrar root sub-spawning
Ensure non-registrar root actors can spawn children and that
those children receive correct parent contact info. This test
catches the bug reported in,

https://github.com/goodboy/tractor/issues/410

Add new `test_non_registrar_spawns_child()` which spawns a sub-actor
from a non-registrar root and verifies the child can manually connect
back to its parent using `get_root()` API, auditing
`._state._runtime_vars` addr propagation from rent to child.

Also,
- improve type hints throughout test suites
  (`subprocess.Popen`, `UnwrappedAddress`, `Aid` etc.)
- rename `n` -> `an` for actor nursery vars
- use multiline style for function signatures

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:17:26 -05:00
Bd 51ac0c623e
Merge pull request #402 from goodboy/log_sys_testing
Log sys testing, starting to get "serious" about it..
2026-02-11 22:13:17 -05:00
Gud Boi 3f0bde1bf8 Use bare `get_logger()` in `.to_asyncio` 2026-02-11 22:02:41 -05:00
Gud Boi fa1a15dce8 Cleaups per copilot PR review 2026-02-11 21:51:40 -05:00
Gud Boi 5850844297 Mk `test_implicit_mod_name_applied_for_child()` check init-mods
Test pkg-level init module and sub-pkg module logger naming
to better validate auto-naming logic.

Deats,
- create `pkg_init_mod` and write `mod_code` to it for
  testing pkg-level `__init__.py` logger instance creation.
  * assert `snakelib.__init__` logger name is `proj_name`.
- write `mod_code` to `subpkg/__init__.py`` as well and check the same.

Also,
- rename some vars,
  * `pkg_mod` -> `pkg_submod`,
  * `pkgmod` -> `subpkgmod`
- add `ModuleType` import for type hints
- improve comments explaining pkg init vs first-level
  sub-module naming expectations.
- drop trailing whitespace and unused TODO comment
- remove masked `breakpoint()` call

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:43:37 -05:00
Gud Boi ff02939213 Toss in some `colorlog` alts to try 2026-02-11 21:05:16 -05:00
Gud Boi d61e8caab2 Improve `test_log_sys` for new auto-naming logic
Add assertions and comments to better test the reworked
implicit module-name detection in `get_logger()`.

Deats,
- add `assert not tractor.current_actor()` check to verify
  no runtime is active during test.
- import `.log` submod directly for use.
- add masked `breakpoint()` for debugging mod loading.
- add comment about using `ranger` to inspect `testdir` layout
  of auto-generated py pkg + module-files.
- improve comments explaining pkg-root-log creation.
- add TODO for testing `get_logger()` call from pkg
  `__init__.py`
- add comment about first-pkg-level module naming.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:05:07 -05:00
Gud Boi 0b0c83e9da Drop `name=__name__` from all `get_logger()` calls
Use new implicit module-name detection throughout codebase to simplify
logger creation and leverage auto-naming from caller mod .

Main changes,
- drop `name=__name__` arg from all `get_logger()` calls
  (across 29 modules).
- update `get_console_log()` calls to include `name='tractor'` for
  enabling root logger in test harness and entry points; this ensures
  logic in `get_logger()` triggers so that **all** `tractor`-internal
  logging emits to console.
- add info log msg in test `conftest.py` showing test-harness
  log level

Also,
- fix `.actor.uid` ref to `.actor.aid.uid` in `._trace`.
- adjust a `._context` log msg formatting for clarity.
- add TODO comments in `._addr`, `._uds` for when we mv to
  using `multiaddr`.
- add todo for `RuntimeVars` type hint TODO in `.msg.types` (once we
  eventually get that all going obvi!)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:04:49 -05:00
Gud Boi 5e7c0f264d Rework `.get_logger()`, better autonaming, deduping
Overhaul of the automatic-calling-module-name detection and sub-log
creation logic to avoid (at least warn) on duplication(s) and still
handle the common usage of a call with `name=__name__` from a mod's top
level scope.

Main `get_logger()` changes,
- refactor auto-naming logic for implicit `name=None` case such that we
  handle at least `tractor` internal "bare" calls from internal submods.
- factor out the `get_caller_mod()` closure (still inside
  `get_logger()`)for introspecting caller's module with configurable
  frame depth.
- use `.removeprefix()` instead of `.lstrip()` for stripping pkg-name
  from mod paths
- mv root-logger creation before sub-logger name processing
- improve duplicate detection for `pkg_name` in `name`
- add `_strict_debug=True`-only-emitted warnings for duplicate
  pkg/leaf-mod names.
- use `print()` fallback for warnings when no actor runtime is up at
  call time.

Surrounding tweaks,
- add `.level` property to `StackLevelAdapter` for getting
  current emit level as lowercase `str`.
- mv `_proj_name` def to just above `get_logger()`
- use `_curr_actor_no_exc` partial in `_conc_name_getters`
  to avoid runtime errors
- improve comments/doc-strings throughout
- keep some masked `breakpoint()` calls for future debugging

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 21:04:29 -05:00
Gud Boi edf1189fe0 Multi-line styling in `test.devx.conftest` 2026-02-11 21:04:22 -05:00
Tyler Goodlet de24bfe052 Mv `load_module_from_path()` to a new `._code_load` submod 2026-02-11 21:03:29 -05:00
Tyler Goodlet e235b96894 Use new `pkg_name` in log-sys test suites 2026-02-11 21:03:07 -05:00
Tyler Goodlet dea4b9fd93 Implicitly name sub-logs by caller's mod
That is when no `name` is passed to `get_logger()`, try to introspect
the caller's `module.__name__` and use it to infer/get the "namespace
path" to that module the same as if using `name=__name__` as in the most
common usage.

Further, change the `_root_name` to be `pkg_name: str`, a public and
more obvious param name, and deprecate the former. This obviously adds
the necessary impl to make the new
`test_sys_log::test_implicit_mod_name_applied_for_child` test pass.

Impl detalles for `get_logger()`,
- add `pkg_name` and deprecate `_root_name`, include failover logic
  and a warning.
- implement calling module introspection using
  `inspect.stack()/getmodule()` to get both the `.__name__` and
  `.__package__` info alongside adjusted logic to set the `name`
  when not provided but only when a new `mk_sublog: bool` is set.
- tweak the `name` processing for implicitly set case,
  - rename `sub_name` -> `pkg_path: str` which is the path
    to the calling module minus that module's name component.
  - only partition `name` if `pkg_name` is `in` it.
  - use the `_root_log` for `pkg_name` duplication warnings.

Other/related,
- add types to various public mod vars missing them.
- rename `.log.log` -> `.log._root_log`.
2026-02-11 21:03:07 -05:00
Tyler Goodlet 557e2cec6a Add an implicit-pkg-path-as-logger-name test
A bit of test driven dev to anticipate support  of `.log.get_logger()`
usage such that it can be called from arbitrary sub-modules, themselves
embedded in arbitrary sub-pkgs, of some project; the when not provided,
the `sub_name` passed to the `Logger.getChild(<sub_name>)` will be set
as the sub-pkg path "down to" the calling module.

IOW if you call something like,

`log = tractor.log.get_logger(pkg_name='mypylib')`

from some `submod.py` in a project-dir that looks like,

mypylib/
  mod.py
  subpkg/
    submod.py  <- calling module

the `log: StackLevelAdapter` child-`Logger` instance will have a
`.name: str = 'mypylib.subpkg'`, discluding the `submod` part since this
already rendered as the `{filename}` header in `log.LOG_FORMAT`.

Previously similar behaviour would be obtained by passing
`get_logger(name=__name__)` in the calling module and so much so it
motivated me to make this the default, presuming we can introspect for
the info.

Impl deats,
- duplicated a `load_module_from_path()` from `modden` to load the
  `testdir` rendered py project dir from its path.
 |_should prolly factor it down to this lib anyway bc we're going to
   need it for hot code reload? (well that and `watchfiles` Bp)
- in each of `mod.py` and `submod.py` render the `get_logger()` code
  sin `name`, expecting the (coming shortly) implicit introspection
  feat to do this.
- do `.name` and `.parent` checks against expected sub-logger values
  from `StackLevelAdapter.logger.getChildren()`.
2026-02-11 21:03:07 -05:00
Tyler Goodlet 0e3229f16d Start a logging-sys unit-test module
To start ensuring that when `name=__name__` is passed we try to
de-duplicate the `_root_name` and any `leaf_mod: str` since it's already
included in the headers as `{filename}`.

Deats,
- heavily document the de-duplication `str.partition()`s in
  `.log.get_logger()` and provide the end fix by changing the predicate,
  `if rname == 'tractor':` -> `if rname == _root_name`.
  * also toss in some warnings for when we still detect duplicates.
- add todo comments around logging "filters" (vs. our "adapter").
- create the new `test_log_sys.test_root_pkg_not_duplicated()` which
  runs green with the fixes from ^.
- add a ton of test-suite todos both for existing and anticipated
  logging sys feats in the new mod.
2026-02-11 21:03:07 -05:00
Bd 448d25aef4
Merge pull request #409 from goodboy/nixos_flake
Nixos flake, for the *too-hip-for-arch-ers*
2026-02-11 21:02:37 -05:00
Gud Boi 343c9e0034 Tweaks per the `copilot` PR review 2026-02-11 20:55:08 -05:00
Gud Boi 1dc27c5161 Add a dev-overlay nix flake
Based on the impure template from `pyproject.nix` and providing
a dev-shell for easy bypass-n-hack on nix(os) using `uv`.

Deats,
- include bash completion pkgs for devx/happiness.
- pull `ruff` from <nixpkgs> to avoid wheel (build) issues.
- pin to py313 `cpython` for now.
2026-01-23 16:27:19 -05:00
Gud Boi 14aefa4b11 Reorg dev deps into nested groups
Namely,
- `devx` for console debugging extras used in `tractor.devx`.
- `repl` for @goodboy's `xonsh` hackin utils.
- `testing` for harness stuffs.
- `lint` for whenever we start doing that; it requires special
  separation on nixos in order to pull `ruff` from pkgs.

Oh and bump the lock file.
2026-01-23 16:24:24 -05:00
51 changed files with 2333 additions and 264 deletions

View File

@ -17,6 +17,7 @@ from tractor import (
MsgStream, MsgStream,
_testing, _testing,
trionics, trionics,
TransportClosed,
) )
import trio import trio
import pytest import pytest
@ -208,12 +209,16 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
except trio.ClosedResourceError: except (
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
# NOTE: don't send if we already broke the # NOTE: don't send if we already broke the
# connection to avoid raising a closed-error # connection to avoid raising a closed-error
# such that we drop through to the ctl-c # such that we drop through to the ctl-c
# mashing by user. # mashing by user.
await trio.sleep(0.01) with trio.CancelScope(shield=True):
await trio.sleep(0.01)
# timeout: int = 1 # timeout: int = 1
# with trio.move_on_after(timeout) as cs: # with trio.move_on_after(timeout) as cs:
@ -247,6 +252,7 @@ async def main(
await stream.send(i) await stream.send(i)
pytest.fail('stream not closed?') pytest.fail('stream not closed?')
except ( except (
TransportClosed,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
) as send_err: ) as send_err:

27
flake.lock 100644
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1769018530,
"narHash": "sha256-MJ27Cy2NtBEV5tsK+YraYr2g851f3Fl1LpNHDzDX15c=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "88d3861acdd3d2f0e361767018218e51810df8a1",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

70
flake.nix 100644
View File

@ -0,0 +1,70 @@
# An "impure" template thx to `pyproject.nix`,
# https://pyproject-nix.github.io/pyproject.nix/templates.html#impure
# https://github.com/pyproject-nix/pyproject.nix/blob/master/templates/impure/flake.nix
{
description = "An impure overlay (w dev-shell) using `uv`";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable";
};
outputs =
{ nixpkgs, ... }:
let
inherit (nixpkgs) lib;
forAllSystems = lib.genAttrs lib.systems.flakeExposed;
in
{
devShells = forAllSystems (
system:
let
pkgs = nixpkgs.legacyPackages.${system};
# XXX NOTE XXX, for now we overlay specific pkgs via
# a major-version-pinned-`cpython`
cpython = "python313";
venv_dir = "py313";
pypkgs = pkgs."${cpython}Packages";
in
{
default = pkgs.mkShell {
packages = [
# XXX, ensure sh completions activate!
pkgs.bashInteractive
pkgs.bash-completion
# XXX, on nix(os), use pkgs version to avoid
# build/sys-sh-integration issues
pkgs.ruff
pkgs.uv
pkgs.${cpython}# ?TODO^ how to set from `cpython` above?
];
shellHook = ''
# unmask to debug **this** dev-shell-hook
# set -e
# link-in c++ stdlib for various AOT-ext-pkgs (numpy, etc.)
LD_LIBRARY_PATH="${pkgs.stdenv.cc.cc.lib}/lib:$LD_LIBRARY_PATH"
export LD_LIBRARY_PATH
# RUNTIME-SETTINGS
# ------ uv ------
# - always use the ./py313/ venv-subdir
# - sync env with all extras
export UV_PROJECT_ENVIRONMENT=${venv_dir}
uv sync --dev --all-extras
# ------ TIPS ------
# NOTE, to launch the py-venv installed `xonsh` (like @goodboy)
# run the `nix develop` cmd with,
# >> nix develop -c uv run xonsh
'';
};
}
);
};
}

View File

@ -53,22 +53,33 @@ dependencies = [
[dependency-groups] [dependency-groups]
dev = [ dev = [
# test suite {include-group = 'devx'},
# TODO: maybe some of these layout choices? {include-group = 'testing'},
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules {include-group = 'repl'},
"pytest>=8.3.5", ]
"pexpect>=4.9.0,<5", devx = [
# `tractor.devx` tooling # `tractor.devx` tooling
"greenback>=1.2.1,<2", "greenback>=1.2.1,<2",
"stackscope>=0.2.2,<0.3", "stackscope>=0.2.2,<0.3",
# ^ requires this? # ^ requires this?
"typing-extensions>=4.14.1", "typing-extensions>=4.14.1",
]
testing = [
# test suite
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
"pytest>=8.3.5",
"pexpect>=4.9.0,<5",
]
repl = [
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50", "prompt-toolkit>=3.0.50",
"xonsh>=0.19.2", "xonsh>=0.19.2",
"psutil>=7.0.0", "psutil>=7.0.0",
] ]
lint = [
"ruff>=0.9.6"
]
# TODO, add these with sane versions; were originally in # TODO, add these with sane versions; were originally in
# `requirements-docs.txt`.. # `requirements-docs.txt`..
# docs = [ # docs = [

View File

@ -65,7 +65,11 @@ def loglevel(request):
import tractor import tractor
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel level = tractor.log._default_loglevel = request.config.option.loglevel
tractor.log.get_console_log(level) log = tractor.log.get_console_log(
level=level,
name='tractor', # <- enable root logger
)
log.info(f'Test-harness logging level: {level}\n')
yield level yield level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig

View File

@ -4,6 +4,7 @@
''' '''
from __future__ import annotations from __future__ import annotations
import time import time
import signal
from typing import ( from typing import (
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
@ -34,7 +35,10 @@ if TYPE_CHECKING:
# a fn that sub-instantiates a `pexpect.spawn()` # a fn that sub-instantiates a `pexpect.spawn()`
# and returns it. # and returns it.
type PexpectSpawner = Callable[[str], pty_spawn.spawn] type PexpectSpawner = Callable[
[str],
pty_spawn.spawn,
]
@pytest.fixture @pytest.fixture
@ -66,12 +70,15 @@ def spawn(
import os import os
os.environ['PYTHON_COLORS'] = '0' os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
) -> pty_spawn.spawn: ) -> pty_spawn.spawn:
nonlocal spawned
unset_colors() unset_colors()
return testdir.spawn( spawned = testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
@ -81,9 +88,35 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying # ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff? # `pexpect.spawn()` stuff?
) )
return spawned
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
return _spawn # the `PexpectSpawner`, type alias. yield _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
@pytest.fixture( @pytest.fixture(
@ -109,7 +142,11 @@ def ctlc(
'https://github.com/goodboy/tractor/issues/320' 'https://github.com/goodboy/tractor/issues/320'
) )
if mark.name == 'ctlcs_bish': if (
mark.name == 'ctlcs_bish'
and
use_ctlc
):
pytest.skip( pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n' f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine ' f'The test and/or underlying example script can *sometimes* run fine '

View File

@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?', ['peer IPC channel closed abruptly?',
'another task closed this fd', 'another task closed this fd',
'Debug lock request was CANCELLED?', 'Debug lock request was CANCELLED?',
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",] "'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
# XXX races on whether these show/hit? # XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed expect_final_exc = TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' examples_dir()
/ 'advanced_faults'
/ 'ipc_failure_during_stream.py', / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False, consider_namespace_packages=False,
@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream(
if ( if (
# only expect EoC if trans is broken on the child side, # only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`. # AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream pre_aclose_msgstream
): ):
# expect_final_exc = trio.EndOfChannel # expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and ( and (
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after'] >
ipc_break['break_child_ipc_after']
) )
): ):
if pre_aclose_msgstream: if pre_aclose_msgstream:
@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):
excs = value.exceptions excs: tuple[Exception] = value.exceptions
assert len(excs) == 1 assert (
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
final_exc = excs[0] final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc) assert isinstance(final_exc, expect_final_exc)

View File

@ -11,12 +11,13 @@ import trio
import tractor import tractor
from tractor import ( # typing from tractor import ( # typing
Actor, Actor,
current_actor,
open_nursery,
Portal,
Context, Context,
ContextCancelled, ContextCancelled,
MsgStream,
Portal,
RemoteActorError, RemoteActorError,
current_actor,
open_nursery,
) )
from tractor._testing import ( from tractor._testing import (
# tractor_test, # tractor_test,
@ -796,8 +797,8 @@ async def basic_echo_server(
) -> None: ) -> None:
''' '''
Just the simplest `MsgStream` echo server which resays what Just the simplest `MsgStream` echo server which resays what you
you told it but with its uid in front ;) told it but with its uid in front ;)
''' '''
actor: Actor = tractor.current_actor() actor: Actor = tractor.current_actor()
@ -966,9 +967,14 @@ async def tell_little_bro(
caller: str = '', caller: str = '',
err_after: float|None = None, err_after: float|None = None,
rng_seed: int = 50, rng_seed: int = 100,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with ( async with (
tractor.wait_for_actor( tractor.wait_for_actor(
name=actor_name name=actor_name
@ -983,7 +989,6 @@ async def tell_little_bro(
else None else None
), ),
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
@ -994,6 +999,7 @@ async def tell_little_bro(
i, i,
) )
await echo_ipc.send(msg) await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive() resp = await echo_ipc.receive()
print( print(
f'{caller} => {actor_name}: {msg}\n' f'{caller} => {actor_name}: {msg}\n'
@ -1006,6 +1012,9 @@ async def tell_little_bro(
assert sub_uid != uid assert sub_uid != uid
assert _i == i assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize( @pytest.mark.parametrize(
'raise_client_error', 'raise_client_error',
@ -1020,6 +1029,9 @@ def test_peer_spawns_and_cancels_service_subactor(
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None, raise_sub_spawn_error_after: float|None,
loglevel: str,
# ^XXX, set to 'warning' to see masked-exc warnings
# that may transpire during actor-nursery teardown.
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -1049,6 +1061,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode, debug_mode=debug_mode,
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
loglevel=loglevel,
) as an: ) as an:
server: Portal = await an.start_actor( server: Portal = await an.start_actor(
(server_name := 'spawn_server'), (server_name := 'spawn_server'),

View File

@ -0,0 +1,185 @@
'''
`tractor.log`-wrapping unit tests.
'''
from pathlib import Path
import shutil
from types import ModuleType
import pytest
import tractor
from tractor import (
_code_load,
log,
)
def test_root_pkg_not_duplicated_in_logger_name():
'''
When both `pkg_name` and `name` are passed and they have
a common `<root_name>.< >` prefix, ensure that it is not
duplicated in the child's `StackLevelAdapter.name: str`.
'''
project_name: str = 'pylib'
pkg_path: str = 'pylib.subpkg.mod'
assert not tractor.current_actor(
err_on_no_runtime=False,
)
proj_log = log.get_logger(
pkg_name=project_name,
mk_sublog=False,
)
sublog = log.get_logger(
pkg_name=project_name,
name=pkg_path,
)
assert proj_log is not sublog
assert sublog.name.count(proj_log.name) == 1
assert 'mod' not in sublog.name
def test_implicit_mod_name_applied_for_child(
testdir: pytest.Pytester,
loglevel: str,
):
'''
Verify that when `.log.get_logger(pkg_name='pylib')` is called
from a given sub-mod from within the `pylib` pkg-path, we
implicitly set the equiv of `name=__name__` from the caller's
module.
'''
# tractor.log.get_console_log(level=loglevel)
proj_name: str = 'snakelib'
mod_code: str = (
f'import tractor\n'
f'\n'
# if you need to trace `testdir` stuff @ import-time..
# f'breakpoint()\n'
f'log = tractor.log.get_logger(pkg_name="{proj_name}")\n'
)
# create a sub-module for each pkg layer
_lib = testdir.mkpydir(proj_name)
pkg: Path = Path(_lib)
pkg_init_mod: Path = pkg / "__init__.py"
pkg_init_mod.write_text(mod_code)
subpkg: Path = pkg / 'subpkg'
subpkg.mkdir()
subpkgmod: Path = subpkg / "__init__.py"
subpkgmod.touch()
subpkgmod.write_text(mod_code)
_submod: Path = testdir.makepyfile(
_mod=mod_code,
)
pkg_submod = pkg / 'mod.py'
pkg_subpkg_submod = subpkg / 'submod.py'
shutil.copyfile(
_submod,
pkg_submod,
)
shutil.copyfile(
_submod,
pkg_subpkg_submod,
)
testdir.chdir()
# NOTE, to introspect the py-file-module-layout use (in .xsh
# syntax): `ranger @str(testdir)`
# XXX NOTE, once the "top level" pkg mod has been
# imported, we can then use `import` syntax to
# import it's sub-pkgs and modules.
subpkgmod: ModuleType = _code_load.load_module_from_path(
Path(pkg / '__init__.py'),
module_name=proj_name,
)
pkg_root_log = log.get_logger(
pkg_name=proj_name,
mk_sublog=False,
)
# the top level pkg-mod, created just now,
# by above API call.
assert pkg_root_log.name == proj_name
assert not pkg_root_log.logger.getChildren()
#
# ^TODO! test this same output but created via a `get_logger()`
# call in the `snakelib.__init__py`!!
# NOTE, the pkg-level "init mod" should of course
# have the same name as the package ns-path.
import snakelib as init_mod
assert init_mod.log.name == proj_name
# NOTE, a first-pkg-level sub-module should only
# use the package-name since the leaf-node-module
# will be included in log headers by default.
from snakelib import mod
assert mod.log.name == proj_name
from snakelib import subpkg
assert (
subpkg.log.name
==
subpkg.__package__
==
f'{proj_name}.subpkg'
)
from snakelib.subpkg import submod
assert (
submod.log.name
==
submod.__package__
==
f'{proj_name}.subpkg'
)
sub_logs = pkg_root_log.logger.getChildren()
assert len(sub_logs) == 1 # only one nested sub-pkg module
assert submod.log.logger in sub_logs
# TODO, moar tests against existing feats:
# ------ - ------
# - [ ] color settings?
# - [ ] header contents like,
# - actor + thread + task names from various conc-primitives,
# - [ ] `StackLevelAdapter` extensions,
# - our custom levels/methods: `transport|runtime|cance|pdb|devx`
# - [ ] custom-headers support?
#
# TODO, test driven dev of new-ideas/long-wanted feats,
# ------ - ------
# - [ ] https://github.com/goodboy/tractor/issues/244
# - [ ] @catern mentioned using a sync / deterministic sys
# and in particular `svlogd`?
# |_ https://smarden.org/runit/svlogd.8
# - [ ] using adapter vs. filters?
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# - [ ] `.at_least_level()` optimization which short circuits wtv
# `logging` is doing behind the scenes when the level filters
# the emission..?
# - [ ] use of `.log.get_console_log()` in subactors and the
# subtleties of ensuring it actually emits from a subproc.
# - [ ] this idea of activating per-subsys emissions with some
# kind of `.name` filter passed to the runtime or maybe configured
# via the root `StackLevelAdapter`?
# - [ ] use of `logging.dict.dictConfig()` to simplify the impl
# of any of ^^ ??
# - https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# - https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
# - https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig

View File

@ -1,8 +1,13 @@
""" """
Multiple python programs invoking the runtime. Multiple python programs invoking the runtime.
""" """
from __future__ import annotations
import platform import platform
import subprocess
import time import time
from typing import (
TYPE_CHECKING,
)
import pytest import pytest
import trio import trio
@ -10,14 +15,29 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from tractor import (
current_actor,
_state,
Actor,
Context,
Portal,
)
from .conftest import ( from .conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,
) )
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
UnwrappedAddress,
)
def test_abort_on_sigint(daemon):
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
assert daemon.returncode is None assert daemon.returncode is None
time.sleep(0.1) time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL) sig_prog(daemon, _INT_SIGNAL)
@ -30,8 +50,11 @@ def test_abort_on_sigint(daemon):
@tractor_test @tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr): async def test_cancel_remote_arbiter(
assert not tractor.current_actor().is_arbiter daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
@ -45,24 +68,106 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
pass pass
def test_register_duplicate_name(daemon, reg_addr): def test_register_duplicate_name(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
) as n: ) as an:
assert not tractor.current_actor().is_arbiter assert not current_actor().is_arbiter
p1 = await n.start_actor('doggy') p1 = await an.start_actor('doggy')
p2 = await n.start_actor('doggy') p2 = await an.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal: async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid) assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await n.cancel() await an.cancel()
# run it manually since we want to start **after** # XXX, run manually since we want to start this root **after**
# the other "daemon" program # the other "daemon" program with it's own root.
trio.run(main)
@tractor.context
async def get_root_portal(
ctx: Context,
):
'''
Connect back to the root actor manually (using `._discovery` API)
and ensure it's contact info is the same as our immediate parent.
'''
sub: Actor = current_actor()
rtvs: dict = _state._runtime_vars
raddrs: list[UnwrappedAddress] = rtvs['_root_addrs']
# await tractor.pause()
# XXX, in case the sub->root discovery breaks you might need
# this (i know i did Xp)!!
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
assert (
len(raddrs) == 1
and
list(sub._parent_chan.raddr.unwrap()) in raddrs
)
# connect back to our immediate parent which should also
# be the actor-tree's root.
from tractor._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid
parent_ptl: Portal = current_actor().get_parent()
assert (
root_aid.name == 'root'
and
parent_ptl.chan.aid == root_aid
)
await ctx.started()
def test_non_registrar_spawns_child(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
loglevel: str,
debug_mode: bool,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
that sub can connect back (manually) to it's rent that is the
root without issue.
More or less this audits the global contact info in
`._state._runtime_vars`.
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
actor: Actor = tractor.current_actor()
assert not actor.is_registrar
sub_ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with sub_ptl.open_context(
get_root_portal,
) as (ctx, _):
print('Waiting for `sub` to connect back to us..')
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main) trio.run(main)

View File

@ -17,9 +17,8 @@ from tractor.log import (
get_console_log, get_console_log,
get_logger, get_logger,
) )
log = get_logger(__name__)
log = get_logger()
_resource: int = 0 _resource: int = 0

View File

@ -37,7 +37,7 @@ from .ipc._uds import UDSAddress
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__) log = get_logger()
# TODO, maybe breakout the netns key to a struct? # TODO, maybe breakout the netns key to a struct?
@ -259,6 +259,8 @@ def wrap_address(
case _: case _:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
raise TypeError( raise TypeError(
f'Can not wrap unwrapped-address ??\n' f'Can not wrap unwrapped-address ??\n'
f'type(addr): {type(addr)!r}\n' f'type(addr): {type(addr)!r}\n'

View File

@ -0,0 +1,48 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
(Hot) coad (re-)load utils for python.
'''
import importlib
from pathlib import Path
import sys
from types import ModuleType
# ?TODO, move this into internal libs?
# -[ ] we already use it in `modden.config._pymod` as well
def load_module_from_path(
path: Path,
module_name: str|None = None,
) -> ModuleType:
'''
Taken from SO,
https://stackoverflow.com/a/67208147
which is based on stdlib docs,
https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly
'''
module_name = module_name or path.stem
spec = importlib.util.spec_from_file_location(
module_name,
str(path),
)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module

View File

@ -70,6 +70,7 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
TransportClosed,
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
) )
@ -113,7 +114,7 @@ if TYPE_CHECKING:
CallerInfo, CallerInfo,
) )
log = get_logger(__name__) log = get_logger()
class Unresolved: class Unresolved:
@ -2391,16 +2392,18 @@ async def open_context_from_portal(
case trio.Cancelled(): case trio.Cancelled():
logmeth = log.cancel logmeth = log.cancel
cause: str = 'cancelled' cause: str = 'cancelled'
msg: str = (
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases
case _: case _:
cause: str = 'errored' cause: str = 'errored'
logmeth = log.exception logmeth = log.exception
msg: str = f'ctx {ctx.side!r}-side {cause!r} with,\n'
logmeth( logmeth(msg)
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
if debug_mode(): if debug_mode():
# async with debug.acquire_debug_lock(portal.actor.uid): # async with debug.acquire_debug_lock(portal.actor.uid):
@ -2426,10 +2429,7 @@ async def open_context_from_portal(
try: try:
# await pause(shield=True) # await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except ( except TransportClosed:
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n' f'task: {ctx.cid}\n'

View File

@ -53,7 +53,7 @@ if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__) log = get_logger()
@acm @acm
@ -91,10 +91,13 @@ async def get_registry(
@acm @acm
async def get_root( async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
**kwargs, '''
) -> AsyncGenerator[Portal, None]: Deliver the current actor's "root process" actor (yes in actor
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
'''
# TODO: rename mailbox to `_root_maddr` when we finally # TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs? # add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox'] addr = _runtime_vars['_root_mailbox']
@ -193,6 +196,11 @@ async def maybe_open_portal(
addr: UnwrappedAddress, addr: UnwrappedAddress,
name: str, name: str,
): ):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,

View File

@ -50,7 +50,7 @@ if TYPE_CHECKING:
from ._spawn import SpawnMethodKey from ._spawn import SpawnMethodKey
log = get_logger(__name__) log = get_logger()
def _mp_main( def _mp_main(
@ -72,11 +72,15 @@ def _mp_main(
spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method) spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
assert spawn_ctx assert spawn_ctx
# XXX, enable root log at level
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f'Setting loglevel for {actor.uid} to {actor.loglevel}' f'Setting loglevel for {actor.uid} to {actor.loglevel!r}'
)
get_console_log(
level=actor.loglevel,
name='tractor',
) )
get_console_log(actor.loglevel)
# TODO: use scops headers like for `trio` below! # TODO: use scops headers like for `trio` below!
# (well after we libify it maybe..) # (well after we libify it maybe..)
@ -126,8 +130,12 @@ def _trio_main(
parent_addr=parent_addr parent_addr=parent_addr
) )
# XXX, enable root log at level
if actor.loglevel is not None: if actor.loglevel is not None:
get_console_log(actor.loglevel) get_console_log(
level=actor.loglevel,
name='tractor',
)
log.info( log.info(
f'Starting `trio` subactor from parent @ ' f'Starting `trio` subactor from parent @ '
f'{parent_addr}\n' f'{parent_addr}\n'

View File

@ -69,7 +69,7 @@ from ._streaming import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__) log = get_logger()
class Portal: class Portal:
@ -329,18 +329,7 @@ class Portal:
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
except ( except TransportClosed as tpt_err:
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
ipc_borked_report: str = ( ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n' f'IPC for actor already closed/broken?\n\n'
f'\n' f'\n'

View File

@ -88,7 +88,8 @@ async def maybe_block_bp(
bp_blocked: bool bp_blocked: bool
if ( if (
debug_mode debug_mode
and maybe_enable_greenback and
maybe_enable_greenback
and ( and (
maybe_mod := await debug.maybe_init_greenback( maybe_mod := await debug.maybe_init_greenback(
raise_not_found=False, raise_not_found=False,
@ -289,10 +290,12 @@ async def open_root_actor(
for uw_addr in uw_reg_addrs for uw_addr in uw_reg_addrs
] ]
loglevel = ( loglevel: str = (
loglevel loglevel
or log._default_loglevel or
).upper() log._default_loglevel
)
loglevel: str = loglevel.upper()
if ( if (
debug_mode debug_mode
@ -323,7 +326,10 @@ async def open_root_actor(
) )
assert loglevel assert loglevel
_log = log.get_console_log(loglevel) _log = log.get_console_log(
level=loglevel,
name='tractor',
)
assert _log assert _log
# TODO: factor this into `.devx._stackscope`!! # TODO: factor this into `.devx._stackscope`!!
@ -380,10 +386,13 @@ async def open_root_actor(
addr, addr,
) )
trans_bind_addrs: list[UnwrappedAddress] = [] tpt_bind_addrs: list[
Address # `Address.get_random()` case
|UnwrappedAddress # registrar case `= uw_reg_addrs`
] = []
# Create a new local root-actor instance which IS NOT THE # ------ NON-REGISTRAR ------
# REGISTRAR # create a new root-actor instance.
if ponged_addrs: if ponged_addrs:
if ensure_registry: if ensure_registry:
raise RuntimeError( raise RuntimeError(
@ -410,12 +419,21 @@ async def open_root_actor(
# XXX INSTEAD, bind random addrs using the same tpt # XXX INSTEAD, bind random addrs using the same tpt
# proto. # proto.
for addr in ponged_addrs: for addr in ponged_addrs:
trans_bind_addrs.append( tpt_bind_addrs.append(
# XXX, these are `Address` NOT `UnwrappedAddress`.
#
# NOTE, in the case of posix/berkley socket
# protos we allocate port=0 such that the system
# allocates a random value at bind time; this
# happens in the `.ipc.*` stack's backend.
addr.get_random( addr.get_random(
bindspace=addr.bindspace, bindspace=addr.bindspace,
) )
) )
# ------ REGISTRAR ------
# create a new "registry providing" root-actor instance.
#
# Start this local actor as the "registrar", aka a regular # Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of # actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors. # other process-tree-local sub-actors.
@ -424,7 +442,7 @@ async def open_root_actor(
# following init steps are taken: # following init steps are taken:
# - the tranport layer server is bound to each addr # - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default. # pair defined in provided registry_addrs, or the default.
trans_bind_addrs = uw_reg_addrs tpt_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up # - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub) # indefinitely until either all registered (child/sub)
@ -444,20 +462,10 @@ async def open_root_actor(
enable_modules=enable_modules, enable_modules=enable_modules,
) )
# XXX, in case the root actor runtime was actually run from # XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt # `tractor.to_asyncio.run_as_asyncio_guest()` and NOT
# `.trio.run()`. # `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio'] actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries. # Start up main task set via core actor-runtime nurseries.
try: try:
# assign process-local actor # assign process-local actor
@ -494,14 +502,39 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all # "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as # transitively spawned actors/processes must be as
# well. # well.
await root_tn.start( accept_addrs: list[UnwrappedAddress]
reg_addrs: list[UnwrappedAddress]
(
accept_addrs,
reg_addrs,
) = await root_tn.start(
partial( partial(
_runtime.async_main, _runtime.async_main,
actor, actor,
accept_addrs=trans_bind_addrs, accept_addrs=tpt_bind_addrs,
parent_addr=None parent_addr=None
) )
) )
# NOTE, only set a local-host addr (i.e. like
# `lo`-loopback for TCP) for the process-tree-global
# "root"-process (its tree-wide "mailbox") since all
# sub-actors should be able to speak to their root
# actor over that channel.
#
# ?TODO, per-OS non-network-proto alt options?
# -[ ] on linux we should be able to always use UDS?
#
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
raddrs.extend(
accept_addrs,
)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# if 'chart' in actor.aid.name:
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
try: try:
yield actor yield actor
except ( except (
@ -583,6 +616,13 @@ async def open_root_actor(
): ):
_state._runtime_vars['_debug_mode'] = False _state._runtime_vars['_debug_mode'] = False
# !XXX, clear ALL prior contact info state, this is MEGA
# important if you are opening the runtime multiple times
# from the same parent process (like in our test
# harness)!
_state._runtime_vars['_root_addrs'].clear()
_state._runtime_vars['_root_mailbox'] = None
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor

View File

@ -284,6 +284,15 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
# NOTE, never REPL any pseudo-expected tpt-disconnect.
except TransportClosed as err:
rpc_err = err
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
except ( except (
@ -327,10 +336,15 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n' if _state.debug_mode():
f'|_{ctx}' log.exception(
) f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
@ -419,7 +433,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
log.warning( log.warning(
'RPC task likely errored or cancelled before start?\n' 'RPC task likely crashed or cancelled before start?\n'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
) )
@ -862,9 +876,9 @@ async def _invoke(
) )
logmeth( logmeth(
f'{message}\n' f'{message}'
f'\n' f'\n'
f'{descr_str}\n' f'{descr_str}'
) )
@ -900,6 +914,11 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma.. # that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,

View File

@ -147,6 +147,8 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]:
return nsp2fp return nsp2fp
_bp = False
class Actor: class Actor:
''' '''
The fundamental "runtime" concurrency primitive. The fundamental "runtime" concurrency primitive.
@ -181,6 +183,14 @@ class Actor:
def is_registrar(self) -> bool: def is_registrar(self) -> bool:
return self.is_arbiter return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()`,
@ -272,7 +282,9 @@ class Actor:
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[Address] = [wrap_address(arbiter_addr)] registry_addrs: list[Address] = [
wrap_address(arbiter_addr)
]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -959,6 +971,21 @@ class Actor:
rvs['_is_root'] = False # obvi XD rvs['_is_root'] = False # obvi XD
# TODO, remove! left in just while protoing init fix!
# global _bp
# if (
# 'chart' in self.aid.name
# and
# isinstance(
# rvs['_root_addrs'][0],
# dict,
# )
# and
# not _bp
# ):
# _bp = True
# breakpoint()
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
# `SpawnSpec.reg_addrs` # `SpawnSpec.reg_addrs`
@ -1455,7 +1482,12 @@ async def async_main(
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: UnwrappedAddress|None = None, parent_addr: UnwrappedAddress|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[
tuple[
list[UnwrappedAddress], # accept_addrs
list[UnwrappedAddress], # reg_addrs
]
] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -1634,6 +1666,7 @@ async def async_main(
# if addresses point to the same actor.. # if addresses point to the same actor..
# So we need a way to detect that? maybe iterate # So we need a way to detect that? maybe iterate
# only on unique actor uids? # only on unique actor uids?
addr: UnwrappedAddress
for addr in actor.reg_addrs: for addr in actor.reg_addrs:
try: try:
waddr = wrap_address(addr) waddr = wrap_address(addr)
@ -1642,7 +1675,9 @@ async def async_main(
await debug.pause() await debug.pause()
# !TODO, get rid of the local-portal crap XD # !TODO, get rid of the local-portal crap XD
reg_portal: Portal
async with get_registry(addr) as reg_portal: async with get_registry(addr) as reg_portal:
accept_addr: UnwrappedAddress
for accept_addr in accept_addrs: for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr) accept_addr = wrap_address(accept_addr)
@ -1658,8 +1693,12 @@ async def async_main(
is_registered: bool = True is_registered: bool = True
# init steps complete # init steps complete, deliver IPC-server and
task_status.started() # registrar addrs back to caller.
task_status.started((
accept_addrs,
actor.reg_addrs,
))
# Begin handling our new connection back to our # Begin handling our new connection back to our
# parent. This is done last since we don't want to # parent. This is done last since we don't want to

View File

@ -38,6 +38,7 @@ import trio
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
RemoteActorError, RemoteActorError,
TransportClosed,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -59,7 +60,7 @@ if TYPE_CHECKING:
from .ipc import Channel from .ipc import Channel
log = get_logger(__name__) log = get_logger()
# TODO: the list # TODO: the list
@ -409,10 +410,8 @@ class MsgStream(trio.abc.Channel):
# it). # it).
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
trio.BrokenResourceError, TransportClosed,
trio.ClosedResourceError
) as re: ) as re:
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
@ -593,9 +592,8 @@ class MsgStream(trio.abc.Channel):
), ),
) )
except ( except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
TransportClosed,
) as _trans_err: ) as _trans_err:
trans_err = _trans_err trans_err = _trans_err
if ( if (

View File

@ -62,7 +62,7 @@ if TYPE_CHECKING:
from .ipc import IPCServer from .ipc import IPCServer
log = get_logger(__name__) log = get_logger()
class ActorNursery: class ActorNursery:

View File

@ -49,7 +49,7 @@ from tractor.msg import (
import wrapt import wrapt
log = get_logger(__name__) log = get_logger()
# TODO: yeah, i don't love this and we should prolly just # TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func # write a decorator that actually keeps a stupid ref to the func

View File

@ -51,7 +51,7 @@ from tractor import (
) )
from tractor.devx import debug from tractor.devx import debug
log = logmod.get_logger(__name__) log = logmod.get_logger()
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@ -59,7 +59,7 @@ from ._sigint import (
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
log = get_logger(__name__) log = get_logger()
# ---------------- # ----------------
# XXX PKG TODO XXX # XXX PKG TODO XXX

View File

@ -84,7 +84,7 @@ _crash_msg: str = (
'Opening a pdb REPL in crashed actor' 'Opening a pdb REPL in crashed actor'
) )
log = get_logger(__package__) log = get_logger()
class BoxedMaybeException(Struct): class BoxedMaybeException(Struct):

View File

@ -47,7 +47,7 @@ if TYPE_CHECKING:
Actor, Actor,
) )
log = get_logger(__name__) log = get_logger()
_ctlc_ignore_header: str = ( _ctlc_ignore_header: str = (
'Ignoring SIGINT while debug REPL in use' 'Ignoring SIGINT while debug REPL in use'

View File

@ -58,7 +58,7 @@ from ._sigint import (
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
log = get_logger(__package__) log = get_logger()
async def maybe_wait_for_debugger( async def maybe_wait_for_debugger(

View File

@ -93,7 +93,7 @@ if TYPE_CHECKING:
# from ._post_mortem import BoxedMaybeException # from ._post_mortem import BoxedMaybeException
from ._repl import PdbREPL from ._repl import PdbREPL
log = get_logger(__package__) log = get_logger()
_pause_msg: str = 'Opening a pdb REPL in paused actor' _pause_msg: str = 'Opening a pdb REPL in paused actor'
_repl_fail_msg: str|None = ( _repl_fail_msg: str|None = (
@ -628,7 +628,7 @@ def _set_trace(
log.pdb( log.pdb(
f'{_pause_msg}\n' f'{_pause_msg}\n'
f'>(\n' f'>(\n'
f'|_{actor.uid}\n' f'|_{actor.aid.uid}\n'
f' |_{task}\n' # @ {actor.uid}\n' f' |_{task}\n' # @ {actor.uid}\n'
# f'|_{task}\n' # f'|_{task}\n'
# ^-TODO-^ more compact pformating? # ^-TODO-^ more compact pformating?
@ -1257,3 +1257,26 @@ async def breakpoint(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
**kwargs, **kwargs,
) )
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -81,7 +81,7 @@ if TYPE_CHECKING:
BoxedMaybeException, BoxedMaybeException,
) )
log = get_logger(__name__) log = get_logger()
class LockStatus( class LockStatus(

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -0,0 +1,592 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
else:
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -60,7 +60,7 @@ if TYPE_CHECKING:
from ._transport import MsgTransport from ._transport import MsgTransport
log = get_logger(__name__) log = get_logger()
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
@ -307,7 +307,12 @@ class Channel:
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the transport. Send a coded msg-blob over the underlying IPC transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -334,9 +339,10 @@ class Channel:
except KeyError: except KeyError:
raise err raise err
case TransportClosed(): case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport( log.transport(
f'Transport stream closed due to\n' f'Transport stream closed due to,\n'
f'{err.repr_src_exc()}\n' f'{src_exc_str}'
) )
case _: case _:
@ -345,6 +351,11 @@ class Channel:
raise raise
async def recv(self) -> Any: async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport assert self._transport
return await self._transport.recv() return await self._transport.recv()
@ -418,16 +429,18 @@ class Channel:
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Yield `MsgType` IPC msgs decoded and deliverd from Yield `MsgType` IPC msgs decoded and deliverd from an
an underlying `MsgTransport` protocol. underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen This is a streaming routine alo implemented as an
func (same a `MsgTransport._iter_pkts()`) gets allocated by async-generator func (same a `MsgTransport._iter_pkts()`)
a `.__call__()` inside `.__init__()` where it is assigned to gets allocated by a `.__call__()` inside `.__init__()` where
the `._aiter_msgs` attr. it is assigned to the `._aiter_msgs` attr.
''' '''
assert self._transport if not self._transport:
raise RuntimeError('No IPC transport initialized!?')
while True: while True:
try: try:
async for msg in self._transport: async for msg in self._transport:
@ -462,7 +475,15 @@ class Channel:
# continue # continue
def connected(self) -> bool: def connected(self) -> bool:
return self._transport.connected() if self._transport else False '''
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
async def _do_handshake( async def _do_handshake(
self, self,
@ -493,8 +514,11 @@ async def _connect_chan(
addr: UnwrappedAddress addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:
''' '''
Create and connect a channel with disconnect on context manager Create and connect a `Channel` to the provided `addr`, disconnect
teardown. it on cm exit.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
''' '''
chan = await Channel.from_addr(addr) chan = await Channel.from_addr(addr)

View File

@ -72,7 +72,7 @@ if TYPE_CHECKING:
from .._supervise import ActorNursery from .._supervise import ActorNursery
log = log.get_logger(__name__) log = log.get_logger()
async def maybe_wait_on_canced_subs( async def maybe_wait_on_canced_subs(

View File

@ -59,7 +59,7 @@ except ImportError:
pass pass
log = get_logger(__name__) log = get_logger()
SharedMemory = disable_mantracker() SharedMemory = disable_mantracker()

View File

@ -41,7 +41,7 @@ from tractor.ipc._transport import (
) )
log = get_logger(__name__) log = get_logger()
class TCPAddress( class TCPAddress(

View File

@ -56,7 +56,7 @@ from tractor.msg import (
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor._addr import Address from tractor._addr import Address
log = get_logger(__name__) log = get_logger()
# (codec, transport) # (codec, transport)
@ -154,7 +154,6 @@ class MsgTransport(Protocol):
# ... # ...
class MsgpackTransport(MsgTransport): class MsgpackTransport(MsgTransport):
# TODO: better naming for this? # TODO: better naming for this?
@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre: except trio.ClosedResourceError as cre:
closure_err = cre closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} was already closed locally ?\n' f'{tpt_name} was already closed locally?'
), ),
src_exc=closure_err, src_exc=closure_err,
loglevel='error', loglevel='error',
raise_on_report=( raise_on_report=(
'another task closed this fd' in closure_err.args 'another task closed this fd'
in
closure_err.args
), ),
) from closure_err ) from closure_err
@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport):
trans_err = _re trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err: match trans_err:
# XXX, specifc to UDS transport and its, # XXX, specifc to UDS transport and its,
@ -446,38 +454,42 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' '[Errno 32] Broken pipe'
in in
trans_err.args[0] trans_err_msg
): ):
tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
body=f'{self}\n', body=f'{self}',
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) )
raise tpt_closed from trans_err raise tpt_closed from trans_err
# case trio.ClosedResourceError() if ( # ??TODO??, what case in piker does this and HOW
# 'this socket was already closed' # CAN WE RE-PRODUCE IT?!?!?
# in case trio.ClosedResourceError() if (
# trans_err.args[0] by_whom
# ): ):
# tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
# message=( message=(
# f'{tpt_name} already closed by peer\n' f'{tpt_name} was already closed {by_whom!r}?\n'
# ), ),
# body=f'{self}\n', body=f'{self}',
# src_exc=trans_err, src_exc=trans_err,
# raise_on_report=True, raise_on_report=True,
# loglevel='transport', loglevel='transport',
# ) )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # await tractor.devx._trace.maybe_pause_bp()
# normal operation breakage" we usualy console warn raise tpt_closed from trans_err
# about it.
# XXX, unless the disconnect condition falls
# under "a normal/expected operating breakage"
# (per the `trans_err_msg` guards in the cases
# above) we usualy console-error about it and
# raise-thru. about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' f'{tpt_name} layer failed pre-send ??\n'

View File

@ -63,7 +63,7 @@ if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__) log = get_logger()
def unwrap_sockpath( def unwrap_sockpath(
@ -166,6 +166,10 @@ class UDSAddress(
) )
if actor: if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}' sockname: str = '::'.join(actor.uid) + f'@{pid}'
# ?^TODO, for `multiaddr`'s parser we can't use the `::`
# above^, SO maybe a `.` or something else here?
# sockname: str = '.'.join(actor.uid) + f'@{pid}'
# -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho..
else: else:
prefix: str = '<unknown-actor>' prefix: str = '<unknown-actor>'
if is_root_process(): if is_root_process():

View File

@ -14,11 +14,23 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
Log like a forester! An enhanced logging subsys.
""" An extended logging layer using (for now) the stdlib's `logging`
+ `colorlog` which embeds concurrency-primitive/runtime info into
records (headers) to help you better grok your distributed systems
built on `tractor`.
'''
from collections.abc import Mapping from collections.abc import Mapping
from functools import partial
from inspect import (
FrameInfo,
getmodule,
stack,
)
import sys import sys
import logging import logging
from logging import ( from logging import (
@ -26,20 +38,24 @@ from logging import (
Logger, Logger,
StreamHandler, StreamHandler,
) )
import colorlog # type: ignore from types import ModuleType
import warnings
import colorlog # type: ignore
# ?TODO, some other (modern) alt libs?
# import coloredlogs
# import colored_traceback.auto # ?TODO, need better config?
import trio import trio
from ._state import current_actor from ._state import current_actor
_proj_name: str = 'tractor'
_default_loglevel: str = 'ERROR' _default_loglevel: str = 'ERROR'
# Super sexy formatting thanks to ``colorlog``. # Super sexy formatting thanks to ``colorlog``.
# (NOTE: we use the '{' format style) # (NOTE: we use the '{' format style)
# Here, `thin_white` is just the layperson's gray. # Here, `thin_white` is just the layperson's gray.
LOG_FORMAT = ( LOG_FORMAT: str = (
# "{bold_white}{log_color}{asctime}{reset}" # "{bold_white}{log_color}{asctime}{reset}"
"{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}" " {bold_white}{thin_white}({reset}"
@ -51,7 +67,7 @@ LOG_FORMAT = (
" {reset}{bold_white}{thin_white}{message}" " {reset}{bold_white}{thin_white}{message}"
) )
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT: str = '%b %d %H:%M:%S'
# FYI, ERROR is 40 # FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check? # TODO: use a `bidict` to avoid the :155 check?
@ -75,7 +91,10 @@ STD_PALETTE = {
'TRANSPORT': 'cyan', 'TRANSPORT': 'cyan',
} }
BOLD_PALETTE = { BOLD_PALETTE: dict[
str,
dict[int, str],
] = {
'bold': { 'bold': {
level: f"bold_{color}" for level, color in STD_PALETTE.items()} level: f"bold_{color}" for level, color in STD_PALETTE.items()}
} }
@ -97,9 +116,26 @@ def at_least_level(
return False return False
# TODO: this isn't showing the correct '{filename}' # TODO, compare with using a "filter" instead?
# as it did before.. # - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# |_corresponding dict-config,
# https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig/7507842#7507842
# - [ ] what's the benefit/tradeoffs?
#
class StackLevelAdapter(LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
'''
A (software) stack oriented logger "adapter".
'''
@property
def level(self) -> str:
'''
The currently set `str` emit level (in lowercase).
'''
return logging.getLevelName(
self.getEffectiveLevel()
).lower()
def at_least_level( def at_least_level(
self, self,
@ -248,9 +284,14 @@ def pformat_task_uid(
return f'{task.name}[{tid_part}]' return f'{task.name}[{tid_part}]'
_curr_actor_no_exc = partial(
current_actor,
err_on_no_runtime=False,
)
_conc_name_getters = { _conc_name_getters = {
'task': pformat_task_uid, 'task': pformat_task_uid,
'actor': lambda: current_actor(), 'actor': lambda: _curr_actor_no_exc(),
'actor_name': lambda: current_actor().name, 'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6], 'actor_uid': lambda: current_actor().uid[1][:6],
} }
@ -282,9 +323,16 @@ class ActorContextInfo(Mapping):
return f'no {key} context' return f'no {key} context'
_proj_name: str = 'tractor'
def get_logger( def get_logger(
name: str|None = None, name: str|None = None,
_root_name: str = _proj_name, # ^NOTE, setting `name=_proj_name=='tractor'` enables the "root
# logger" for `tractor` itself.
pkg_name: str = _proj_name,
# XXX, deprecated, use ^
_root_name: str|None = None,
logger: Logger|None = None, logger: Logger|None = None,
@ -293,49 +341,287 @@ def get_logger(
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig # |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema # |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
subsys_spec: str|None = None, subsys_spec: str|None = None,
mk_sublog: bool = True,
_strict_debug: bool = False,
) -> StackLevelAdapter: ) -> StackLevelAdapter:
''' '''
Return the `tractor`-library root logger or a sub-logger for Return the `tractor`-library root logger or a sub-logger for
`name` if provided. `name` if provided.
''' When `name` is left null we try to auto-detect the caller's
log: Logger `mod.__name__` and use that as a the sub-logger key.
log = rlog = logger or logging.getLogger(_root_name) This allows for example creating a module level instance like,
.. code:: python
log = tractor.log.get_logger(_root_name='mylib')
and by default all console record headers will show the caller's
(of any `log.<level>()`-method) correct sub-pkg's
+ py-module-file.
'''
if _root_name:
msg: str = (
'The `_root_name: str` param of `get_logger()` is now deprecated.\n'
'Use the new `pkg_name: str` instead, it is the same usage.\n'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
pkg_name: str = _root_name
def get_caller_mod(
frames_up:int = 2
):
'''
Attempt to get the module which called `tractor.get_logger()`.
'''
callstack: list[FrameInfo] = stack()
caller_fi: FrameInfo = callstack[frames_up]
caller_mod: ModuleType = getmodule(caller_fi.frame)
return caller_mod
# --- Auto--naming-CASE ---
# -------------------------
# Implicitly introspect the caller's module-name whenever `name`
# if left as the null default.
#
# When the `pkg_name` is `in` in the `mod.__name__` we presume
# this instance can be created as a sub-`StackLevelAdapter` and
# that the intention is to get free module-path tracing and
# filtering (well once we implement that) oriented around the
# py-module code hierarchy of the consuming project.
#
if (
mk_sublog
and
name is None
and
pkg_name
):
if (caller_mod := get_caller_mod()):
# ?XXX how is this `caller_mod.__name__` defined?
# => well by how the mod is imported.. XD
# |_https://stackoverflow.com/a/15883682
#
# if pkg_name in caller_mod.__package__:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
mod_ns_path: str = caller_mod.__name__
mod_pkg_ns_path: str = caller_mod.__package__
if (
mod_pkg_ns_path in mod_ns_path
or
pkg_name in mod_ns_path
):
# proper_mod_name = mod_ns_path.lstrip(
proper_mod_name = mod_pkg_ns_path.removeprefix(
f'{pkg_name}.'
)
name = proper_mod_name
elif (
pkg_name
# and
# pkg_name in mod_ns_path
):
name = mod_ns_path
if _strict_debug:
msg: str = (
f'@ {get_caller_mod()}\n'
f'Generating sub-logger name,\n'
f'{pkg_name}.{name}\n'
)
if _curr_actor_no_exc():
_root_log.debug(msg)
elif pkg_name != _proj_name:
print(
f'=> tractor.log.get_logger():\n'
f'{msg}\n'
)
# build a root logger instance
log: Logger
rlog = log = (
logger
or
logging.getLogger(pkg_name)
)
# XXX, lowlevel debuggin..
# if pkg_name != _proj_name:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
# NOTE: for handling for modules that use the unecessary,
# `get_logger(__name__)`
#
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
if ( if (
name name
and and
name != _proj_name # ?TODO? more correct?
# _proj_name not in name
name != pkg_name
): ):
# ex. modden.runtime.progman
# -> rname='modden', _, pkg_path='runtime.progman'
if (
pkg_name
and
pkg_name in name
):
proper_name: str = name.removeprefix(
f'{pkg_name}.'
)
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate pkg-name in sub-logger `name`-key?\n'
f'pkg_name = {pkg_name!r}\n'
f'name = {name!r}\n'
f'\n'
f'=> You should change your input params to,\n'
f'get_logger(\n'
f' pkg_name={pkg_name!r}\n'
f' name={proper_name!r}\n'
f')'
)
# assert _duplicate == rname
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# NOTE: for handling for modules that use `get_logger(__name__)` name = proper_name
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
sub_name: None|str = None rname: str = pkg_name
rname, _, sub_name = name.partition('.') pkg_path: str = name
pkgpath, _, modfilename = sub_name.rpartition('.')
# NOTE: for tractor itself never include the last level
# module key in the name such that something like: eg.
# 'tractor.trionics._broadcast` only includes the first
# 2 tokens in the (coloured) name part.
if rname == 'tractor':
sub_name = pkgpath
if _root_name in sub_name: # (
duplicate, _, sub_name = sub_name.partition('.') # rname,
# _,
# pkg_path,
# ) = name.partition('.')
if not sub_name: # For ex. 'modden.runtime.progman'
# -> pkgpath='runtime', _, leaf_mod='progman'
(
subpkg_path,
_,
leaf_mod,
) = pkg_path.rpartition('.')
# NOTE: special usage for passing `name=__name__`,
#
# - remove duplication of any root-pkg-name in the
# (sub/child-)logger name; i.e. never include the
# `pkg_name` *twice* in the top-most-pkg-name/level
#
# -> this happens normally since it is added to `.getChild()`
# and as the name of its root-logger.
#
# => So for ex. (module key in the name) something like
# `name='tractor.trionics._broadcast` is passed,
# only includes the first 2 sub-pkg name-tokens in the
# child-logger's name; the colored "pkg-namespace" header
# will then correctly show the same value as `name`.
if (
# XXX, TRY to remove duplication cases
# which get warn-logged on below!
(
# when, subpkg_path == pkg_path
subpkg_path
and
rname == pkg_name
)
# ) or (
# # when, pkg_path == leaf_mod
# pkg_path
# and
# leaf_mod == pkg_path
# )
):
pkg_path = subpkg_path
# XXX, do some double-checks for duplication of,
# - root-pkg-name, already in root logger
# - leaf-module-name already in `{filename}` header-field
if (
_strict_debug
and
pkg_name
and
pkg_name in pkg_path
):
_duplicate, _, pkg_path = pkg_path.partition('.')
if _duplicate:
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate pkg-name in sub-logger key?\n'
f'pkg_name = {pkg_name!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
# assert _duplicate == rname
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# XXX, should never get here?
breakpoint()
if (
_strict_debug
and
leaf_mod
and
leaf_mod in pkg_path
):
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate leaf-module-name in sub-logger key?\n'
f'leaf_mod = {leaf_mod!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# mk/get underlying (sub-)`Logger`
if (
not pkg_path
and
leaf_mod == pkg_name
):
# breakpoint()
log = rlog log = rlog
else:
log = rlog.getChild(sub_name) elif mk_sublog:
# breakpoint()
log = rlog.getChild(pkg_path)
log.level = rlog.level log.level = rlog.level
@ -350,8 +636,13 @@ def get_logger(
for name, val in CUSTOM_LEVELS.items(): for name, val in CUSTOM_LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ensure customs levels exist as methods # ensure our custom adapter levels exist as methods
assert getattr(logger, name.lower()), f'Logger does not define {name}' assert getattr(
logger,
name.lower()
), (
f'Logger does not define {name}'
)
return logger return logger
@ -425,4 +716,4 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log: StackLevelAdapter = get_logger('tractor') _root_log: StackLevelAdapter = get_logger('tractor')

View File

@ -68,7 +68,7 @@ from tractor.log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor._context import Context from tractor._context import Context
log = get_logger(__name__) log = get_logger()
# TODO: unify with `MsgCodec` by making `._dec` part this? # TODO: unify with `MsgCodec` by making `._dec` part this?

View File

@ -77,7 +77,7 @@ if TYPE_CHECKING:
from tractor._streaming import MsgStream from tractor._streaming import MsgStream
log = get_logger(__name__) log = get_logger()
_def_any_pldec: MsgDec[Any] = mk_dec(spec=Any) _def_any_pldec: MsgDec[Any] = mk_dec(spec=Any)

View File

@ -51,7 +51,7 @@ from tractor.log import get_logger
# from tractor._addr import UnwrappedAddress # from tractor._addr import UnwrappedAddress
log = get_logger('tractor.msgspec') log = get_logger()
# type variable for the boxed payload field `.pld` # type variable for the boxed payload field `.pld`
PayloadT = TypeVar('PayloadT') PayloadT = TypeVar('PayloadT')
@ -202,7 +202,10 @@ class SpawnSpec(
# TODO: similar to the `Start` kwargs spec needed below, we need # TODO: similar to the `Start` kwargs spec needed below, we need
# a hard `Struct` def for all of these fields! # a hard `Struct` def for all of these fields!
_parent_main_data: dict _parent_main_data: dict
_runtime_vars: dict[str, Any] _runtime_vars: (
dict[str, Any]
#|RuntimeVars # !TODO
)
# ^NOTE see `._state._runtime_vars: dict` # ^NOTE see `._state._runtime_vars: dict`
# module import capability # module import capability

View File

@ -71,7 +71,7 @@ from outcome import (
Outcome, Outcome,
) )
log: StackLevelAdapter = get_logger(__name__) log: StackLevelAdapter = get_logger()
__all__ = [ __all__ = [

View File

@ -21,7 +21,6 @@ Sugary patterns for trio + tractor designs.
from ._mngrs import ( from ._mngrs import (
gather_contexts as gather_contexts, gather_contexts as gather_contexts,
maybe_open_context as maybe_open_context, maybe_open_context as maybe_open_context,
maybe_open_nursery as maybe_open_nursery,
) )
from ._broadcast import ( from ._broadcast import (
AsyncReceiver as AsyncReceiver, AsyncReceiver as AsyncReceiver,
@ -37,3 +36,6 @@ from ._beg import (
from ._taskc import ( from ._taskc import (
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
) )
from ._tn import (
maybe_open_nursery as maybe_open_nursery,
)

View File

@ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html
from __future__ import annotations from __future__ import annotations
from abc import abstractmethod from abc import abstractmethod
from collections import deque from collections import deque
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from operator import ne from operator import ne
from typing import ( from typing import (
@ -42,7 +42,7 @@ from trio.lowlevel import current_task
from msgspec import Struct from msgspec import Struct
from tractor.log import get_logger from tractor.log import get_logger
log = get_logger(__name__) log = get_logger()
# TODO: use new type-vars syntax from 3.12 # TODO: use new type-vars syntax from 3.12
# https://realpython.com/python312-new-features/#dedicated-type-variable-syntax # https://realpython.com/python312-new-features/#dedicated-type-variable-syntax
@ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel):
return await self._receive_from_underlying(key, state) return await self._receive_from_underlying(key, state)
@asynccontextmanager @acm
async def subscribe( async def subscribe(
self, self,
raise_on_lag: bool = True, raise_on_lag: bool = True,

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
import inspect import inspect
from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
@ -33,52 +32,25 @@ from typing import (
Hashable, Hashable,
Sequence, Sequence,
TypeVar, TypeVar,
TYPE_CHECKING,
) )
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
from ._tn import maybe_open_nursery
# from ._beg import collapse_eg # from ._beg import collapse_eg
# from ._taskc import ( # from ._taskc import (
# maybe_raise_from_masking_exc, # maybe_raise_from_masking_exc,
# ) # )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger()
log = get_logger(__name__)
# A regular invariant generic type # A regular invariant generic type
T = TypeVar("T") T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as nursery:
if lib == trio:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
unwrapped: dict[int, T], unwrapped: dict[int, T],

View File

@ -0,0 +1,341 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Erlang-style (ish) "one-cancels-one" nursery, what we just call
a "task manager".
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from functools import partial
from typing import (
Generator,
Any,
)
from outcome import (
Outcome,
acapture,
)
from msgspec import Struct
import trio
from trio import (
TaskStatus,
CancelScope,
Nursery,
)
from trio.lowlevel import (
Task,
)
from tractor.log import get_logger
log = get_logger(__name__)
class TaskOutcome(Struct):
'''
The outcome of a scheduled ``trio`` task which includes an interface
for synchronizing to the completion of the task's runtime and access
to the eventual boxed result/value or raised exception.
'''
lowlevel_task: Task
_exited = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value
@property
def result(self) -> Any:
'''
Either Any or None depending on whether the Outcome has compeleted.
'''
if self._outcome is None:
raise RuntimeError(
f'Task {self.lowlevel_task.name} is not complete.\n'
'First wait on `await TaskOutcome.wait_for_result()`!'
)
return self._result
def _set_outcome(
self,
outcome: Outcome,
):
'''
Set the ``Outcome`` for this task.
This method should only ever be called by the task's supervising
nursery implemenation.
'''
self._outcome = outcome
self._result = outcome.unwrap()
self._exited.set()
async def wait_for_result(self) -> Any:
'''
Unwind the underlying task's ``Outcome`` by async waiting for
the task to first complete and then unwrap it's result-value.
'''
if self._exited.is_set():
return self._result
await self._exited.wait()
out = self._outcome
if out is None:
raise ValueError(f'{out} is not an outcome!?')
return self.result
class TaskManagerNursery(Struct):
_tn: Nursery
_scopes: dict[
Task,
tuple[CancelScope, Outcome]
] = {}
task_manager: Generator[Any, Outcome, None] | None = None
async def start_soon(
self,
async_fn,
*args,
name=None,
task_manager: Generator[Any, Outcome, None] | None = None
) -> tuple[CancelScope, Task]:
# NOTE: internals of a nursery don't let you know what
# the most recently spawned task is by order.. so we'd
# have to either change that or do set ops.
# pre_start_tasks: set[Task] = n._children.copy()
# new_tasks = n._children - pre_start_Tasks
# assert len(new_tasks) == 1
# task = new_tasks.pop()
tn: Nursery = self._tn
sm = self.task_manager
# we do default behavior of a scope-per-nursery
# if the user did not provide a task manager.
if sm is None:
return tn.start_soon(async_fn, *args, name=None)
# new_task: Task|None = None
to_return: tuple[Any] | None = None
# NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here?
mngr = sm(nursery=tn)
async def _start_wrapped_in_scope(
task_status: TaskStatus[
tuple[CancelScope, Task]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this was working before?! and, do we need something
# like it to implement `.start()`?
# nonlocal to_return
# execute up to the first yield
try:
to_return: tuple[Any] = next(mngr)
except StopIteration:
raise RuntimeError("task manager didn't yield") from None
# TODO: how do we support `.start()` style?
# - relay through whatever the
# started task passes back via `.started()` ?
# seems like that won't work with also returning
# a "task handle"?
# - we were previously binding-out this `to_return` to
# the parent's lexical scope, why isn't that working
# now?
task_status.started(to_return)
# invoke underlying func now that cs is entered.
outcome = await acapture(async_fn, *args)
# execute from the 1st yield to return and expect
# generator-mngr `@task_scope_manager` thinger to
# terminate!
try:
mngr.send(outcome)
# I would presume it's better to have a handle to
# the `Outcome` entirely? This method sends *into*
# the mngr this `Outcome.value`; seems like kinda
# weird semantics for our purposes?
# outcome.send(mngr)
except StopIteration:
return
else:
raise RuntimeError(f"{mngr} didn't stop!")
to_return = await tn.start(_start_wrapped_in_scope)
assert to_return is not None
# TODO: use the fancy type-check-time type signature stuff from
# mypy i guess..to like, relay the type of whatever the
# generator yielded through? betcha that'll be un-grokable XD
return to_return
# TODO: define a decorator to runtime type check that this a generator
# with a single yield that also delivers a value (of some std type) from
# the yield expression?
# @trio.task_manager
def add_task_handle_and_crash_handling(
nursery: Nursery,
debug_mode: bool = False,
) -> Generator[
Any,
Outcome,
None,
]:
'''
A customizable, user defined "task scope manager".
With this specially crafted single-yield generator function you can
add more granular controls around every task spawned by `trio` B)
'''
# if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task()
log.info(f'Spawning task: {task.name}')
# User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
task_outcome = TaskOutcome(task)
# NOTE: if wanted the user could wrap the output task handle however
# they want!
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome
# this yields back when the task is terminated, cancelled or returns.
try:
with CancelScope() as cs:
# the yielded value(s) here are what are returned to the
# nursery's `.start_soon()` caller B)
lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome)
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err:
if debug_mode:
log.exception(
f'{task.name} crashed, entering debugger!'
)
import pdbp
pdbp.xpm()
raise err
finally:
log.info(
f'Task exitted\n'
f')>\n'
f' |_{task}\n'
# ^^TODO? use sclang formatter?
# -[ ] .devx.pformat.nest_from_op()` yo!
)
@acm
async def open_taskman(
task_manager: Generator[Any, Outcome, None] | None = None,
**lowlevel_nursery_kwargs,
):
async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse:
yield TaskManagerNursery(
nurse,
task_manager=task_manager,
)
async def sleep_then_return_val(val: str):
await trio.sleep(0.2)
return val
async def ensure_cancelled():
try:
await trio.sleep_forever()
except trio.Cancelled:
task = trio.lowlevel.current_task()
log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)')
assert 0
except BaseException:
raise RuntimeError("woa woa woa this ain't right!")
if __name__ == '__main__':
from tractor.log import get_console_log
get_console_log(level='info')
async def main():
async with open_taskman(
task_manager=partial(
add_task_handle_and_crash_handling,
debug_mode=True,
),
) as tm:
for _ in range(3):
outcome, _ = await tm.start_soon(trio.sleep_forever)
# extra task we want to engage in debugger post mortem.
err_outcome, cs = await tm.start_soon(ensure_cancelled)
val: str = 'yoyoyo'
val_outcome, _ = await tm.start_soon(
sleep_then_return_val,
val,
)
res = await val_outcome.wait_for_result()
assert res == val
log.info(f'{res} -> GOT EXPECTED TASK VALUE')
await trio.sleep(0.6)
log.cancel(
f'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)
cs.cancel()
trio.run(main)

View File

@ -34,7 +34,7 @@ from typing import (
import trio import trio
from tractor.log import get_logger from tractor.log import get_logger
log = get_logger(__name__) log = get_logger()
if TYPE_CHECKING: if TYPE_CHECKING:
@ -246,23 +246,12 @@ async def maybe_raise_from_masking_exc(
type(exc_match) # masked type type(exc_match) # masked type
) )
# Add to masked `exc_ctx`
if do_warn: if do_warn:
exc_ctx.add_note(note) exc_ctx.add_note(note)
if ( # don't unmask already known "special" cases..
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note)
if (
do_warn
and
raise_unmasked
):
if len(masked) < 2: if len(masked) < 2:
# don't unmask already known "special" cases..
if ( if (
_mask_cases _mask_cases
and and
@ -283,11 +272,26 @@ async def maybe_raise_from_masking_exc(
) )
raise exc_match raise exc_match
raise exc_ctx from exc_match # ^?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
if type(exc_match) in always_warn_on:
import traceback
trace: list[str] = traceback.format_exception(
type(exc_ctx),
exc_ctx,
exc_ctx.__traceback__
)
tb_str: str = ''.join(trace)
log.warning(tb_str)
# XXX, for debug
# from tractor import pause
# await pause(shield=True)
if raise_unmasked:
raise exc_ctx from exc_match
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else: else:
raise raise

View File

@ -0,0 +1,94 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`trio.Nursery` wrappers which we short-hand refer to as
`tn`: "task nursery".
(whereas we refer to `tractor.ActorNursery` as the short-hand `an`)
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from typing import (
Any,
AsyncGenerator,
TYPE_CHECKING,
)
import trio
from tractor.log import get_logger
# from ._beg import (
# collapse_eg,
# )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
# ??TODO? is this even a good idea??
# it's an extra LoC to stack `collapse_eg()` vs.
# a new/foreign/bad-std-named very thing wrapper..?
# -[ ] is there a better/simpler name?
# @acm
# async def open_loose_tn() -> trio.Nursery:
# '''
# Implements the equivalent of the old style loose eg raising
# task-nursery from `trio<=0.25.0` ,
# .. code-block:: python
# async with trio.open_nursery(
# strict_exception_groups=False,
# ) as tn:
# ...
# '''
# async with (
# collapse_eg(),
# trio.open_nursery() as tn,
# ):
# yield tn
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
loose: bool = False,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as tn:
tn.cancel_scope.shield = shield
yield tn

62
uv.lock
View File

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.11" requires-python = ">=3.11"
[[package]] [[package]]
@ -329,6 +329,32 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634, upload-time = "2025-03-02T12:54:52.069Z" }, { url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634, upload-time = "2025-03-02T12:54:52.069Z" },
] ]
[[package]]
name = "ruff"
version = "0.14.14"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2e/06/f71e3a86b2df0dfa2d2f72195941cd09b44f87711cb7fa5193732cb9a5fc/ruff-0.14.14.tar.gz", hash = "sha256:2d0f819c9a90205f3a867dbbd0be083bee9912e170fd7d9704cc8ae45824896b", size = 4515732, upload-time = "2026-01-22T22:30:17.527Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d2/89/20a12e97bc6b9f9f68343952da08a8099c57237aef953a56b82711d55edd/ruff-0.14.14-py3-none-linux_armv6l.whl", hash = "sha256:7cfe36b56e8489dee8fbc777c61959f60ec0f1f11817e8f2415f429552846aed", size = 10467650, upload-time = "2026-01-22T22:30:08.578Z" },
{ url = "https://files.pythonhosted.org/packages/a3/b1/c5de3fd2d5a831fcae21beda5e3589c0ba67eec8202e992388e4b17a6040/ruff-0.14.14-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6006a0082336e7920b9573ef8a7f52eec837add1265cc74e04ea8a4368cd704c", size = 10883245, upload-time = "2026-01-22T22:30:04.155Z" },
{ url = "https://files.pythonhosted.org/packages/b8/7c/3c1db59a10e7490f8f6f8559d1db8636cbb13dccebf18686f4e3c9d7c772/ruff-0.14.14-py3-none-macosx_11_0_arm64.whl", hash = "sha256:026c1d25996818f0bf498636686199d9bd0d9d6341c9c2c3b62e2a0198b758de", size = 10231273, upload-time = "2026-01-22T22:30:34.642Z" },
{ url = "https://files.pythonhosted.org/packages/a1/6e/5e0e0d9674be0f8581d1f5e0f0a04761203affce3232c1a1189d0e3b4dad/ruff-0.14.14-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f666445819d31210b71e0a6d1c01e24447a20b85458eea25a25fe8142210ae0e", size = 10585753, upload-time = "2026-01-22T22:30:31.781Z" },
{ url = "https://files.pythonhosted.org/packages/23/09/754ab09f46ff1884d422dc26d59ba18b4e5d355be147721bb2518aa2a014/ruff-0.14.14-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3c0f18b922c6d2ff9a5e6c3ee16259adc513ca775bcf82c67ebab7cbd9da5bc8", size = 10286052, upload-time = "2026-01-22T22:30:24.827Z" },
{ url = "https://files.pythonhosted.org/packages/c8/cc/e71f88dd2a12afb5f50733851729d6b571a7c3a35bfdb16c3035132675a0/ruff-0.14.14-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1629e67489c2dea43e8658c3dba659edbfd87361624b4040d1df04c9740ae906", size = 11043637, upload-time = "2026-01-22T22:30:13.239Z" },
{ url = "https://files.pythonhosted.org/packages/67/b2/397245026352494497dac935d7f00f1468c03a23a0c5db6ad8fc49ca3fb2/ruff-0.14.14-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:27493a2131ea0f899057d49d303e4292b2cae2bb57253c1ed1f256fbcd1da480", size = 12194761, upload-time = "2026-01-22T22:30:22.542Z" },
{ url = "https://files.pythonhosted.org/packages/5b/06/06ef271459f778323112c51b7587ce85230785cd64e91772034ddb88f200/ruff-0.14.14-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:01ff589aab3f5b539e35db38425da31a57521efd1e4ad1ae08fc34dbe30bd7df", size = 12005701, upload-time = "2026-01-22T22:30:20.499Z" },
{ url = "https://files.pythonhosted.org/packages/41/d6/99364514541cf811ccc5ac44362f88df66373e9fec1b9d1c4cc830593fe7/ruff-0.14.14-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1cc12d74eef0f29f51775f5b755913eb523546b88e2d733e1d701fe65144e89b", size = 11282455, upload-time = "2026-01-22T22:29:59.679Z" },
{ url = "https://files.pythonhosted.org/packages/ca/71/37daa46f89475f8582b7762ecd2722492df26421714a33e72ccc9a84d7a5/ruff-0.14.14-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bb8481604b7a9e75eff53772496201690ce2687067e038b3cc31aaf16aa0b974", size = 11215882, upload-time = "2026-01-22T22:29:57.032Z" },
{ url = "https://files.pythonhosted.org/packages/2c/10/a31f86169ec91c0705e618443ee74ede0bdd94da0a57b28e72db68b2dbac/ruff-0.14.14-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:14649acb1cf7b5d2d283ebd2f58d56b75836ed8c6f329664fa91cdea19e76e66", size = 11180549, upload-time = "2026-01-22T22:30:27.175Z" },
{ url = "https://files.pythonhosted.org/packages/fd/1e/c723f20536b5163adf79bdd10c5f093414293cdf567eed9bdb7b83940f3f/ruff-0.14.14-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:e8058d2145566510790eab4e2fad186002e288dec5e0d343a92fe7b0bc1b3e13", size = 10543416, upload-time = "2026-01-22T22:30:01.964Z" },
{ url = "https://files.pythonhosted.org/packages/3e/34/8a84cea7e42c2d94ba5bde1d7a4fae164d6318f13f933d92da6d7c2041ff/ruff-0.14.14-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:e651e977a79e4c758eb807f0481d673a67ffe53cfa92209781dfa3a996cf8412", size = 10285491, upload-time = "2026-01-22T22:30:29.51Z" },
{ url = "https://files.pythonhosted.org/packages/55/ef/b7c5ea0be82518906c978e365e56a77f8de7678c8bb6651ccfbdc178c29f/ruff-0.14.14-py3-none-musllinux_1_2_i686.whl", hash = "sha256:cc8b22da8d9d6fdd844a68ae937e2a0adf9b16514e9a97cc60355e2d4b219fc3", size = 10733525, upload-time = "2026-01-22T22:30:06.499Z" },
{ url = "https://files.pythonhosted.org/packages/6a/5b/aaf1dfbcc53a2811f6cc0a1759de24e4b03e02ba8762daabd9b6bd8c59e3/ruff-0.14.14-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:16bc890fb4cc9781bb05beb5ab4cd51be9e7cb376bf1dd3580512b24eb3fda2b", size = 11315626, upload-time = "2026-01-22T22:30:36.848Z" },
{ url = "https://files.pythonhosted.org/packages/2c/aa/9f89c719c467dfaf8ad799b9bae0df494513fb21d31a6059cb5870e57e74/ruff-0.14.14-py3-none-win32.whl", hash = "sha256:b530c191970b143375b6a68e6f743800b2b786bbcf03a7965b06c4bf04568167", size = 10502442, upload-time = "2026-01-22T22:30:38.93Z" },
{ url = "https://files.pythonhosted.org/packages/87/44/90fa543014c45560cae1fffc63ea059fb3575ee6e1cb654562197e5d16fb/ruff-0.14.14-py3-none-win_amd64.whl", hash = "sha256:3dde1435e6b6fe5b66506c1dff67a421d0b7f6488d466f651c07f4cab3bf20fd", size = 11630486, upload-time = "2026-01-22T22:30:10.852Z" },
{ url = "https://files.pythonhosted.org/packages/9e/6a/40fee331a52339926a92e17ae748827270b288a35ef4a15c9c8f2ec54715/ruff-0.14.14-py3-none-win_arm64.whl", hash = "sha256:56e6981a98b13a32236a72a8da421d7839221fa308b223b9283312312e5ac76c", size = 10920448, upload-time = "2026-01-22T22:30:15.417Z" },
]
[[package]] [[package]]
name = "sniffio" name = "sniffio"
version = "1.3.1" version = "1.3.1"
@ -395,6 +421,24 @@ dev = [
{ name = "typing-extensions" }, { name = "typing-extensions" },
{ name = "xonsh" }, { name = "xonsh" },
] ]
devx = [
{ name = "greenback" },
{ name = "stackscope" },
{ name = "typing-extensions" },
]
lint = [
{ name = "ruff" },
]
repl = [
{ name = "prompt-toolkit" },
{ name = "psutil" },
{ name = "pyperclip" },
{ name = "xonsh" },
]
testing = [
{ name = "pexpect" },
{ name = "pytest" },
]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
@ -420,6 +464,22 @@ dev = [
{ name = "typing-extensions", specifier = ">=4.14.1" }, { name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.19.2" }, { name = "xonsh", specifier = ">=0.19.2" },
] ]
devx = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" },
]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
testing = [
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "pytest", specifier = ">=8.3.5" },
]
[[package]] [[package]]
name = "tricycle" name = "tricycle"