Compare commits

..

58 Commits

Author SHA1 Message Date
Tyler Goodlet da9bc1237d Change one infected-aio test to use `chan` in fn sig 2025-07-29 14:47:24 -04:00
Tyler Goodlet ab11ee4fbe Support `chan.started_nowait()` in `.open_channel_from()` target
That is the `target` can declare a `chan: LinkedTaskChannel` instead of
`to_trio`/`from_aio`.

To support it,
- change `.started()` -> the more appropriate `.started_nowait()` which
  can be called sync from the aio child task.
- adjust the `provide_channels` assert to accept either fn sig
  declaration (for now).

Still needs test(s) obvi..
2025-07-29 14:42:15 -04:00
Tyler Goodlet 466dce8aed Relay `asyncio` errors via EoC and raise from rent
Makes the newly added `test_aio_side_raises_before_started` test pass by
ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned
child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by
checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`.

Impl deats,
- obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False`
- in `translate_aio_errors()` always check for the new flag on EOC
  conditions and in such cases set `chan._trio_to_raise = aio_err` such
  that the `trio`-parent-task always raises the child's exception
  directly, OW keep original EoC passthrough in place.
- include *very* detailed per-case comments around the extended handler.
- adjust re-raising logic with a new `raise_from` where we only give the
  `aio_err` priority if it's not already set as to `trio_to_raise`.

Also,
- hide the `_run_asyncio_task()` frame by def.
2025-07-29 14:30:42 -04:00
Tyler Goodlet 808dd9d73c Add "raises-pre-started" `open_channel_from()` test
Verifying that if any exc is raised pre `chan.send_nowait()` (our
currentlly shite version of a `chan.started()`) then that exc is indeed
raised through on the `trio`-parent task side. This case was reproduced
from a `piker.brokers.ib` issue with a similar embedded
`.trionics.maybe_open_context()` call.

Deats,
- call the suite `test_aio_side_raises_before_started`.
- mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)`
  with a `target=raise_before_started` which,
- simply sleeps then immediately raises a RTE.
- expect the RTE from the aio-child-side to propagate all the way up to
  the root-actor's task right up through the `trio.run()`.
2025-07-29 13:58:48 -04:00
Tyler Goodlet aef306465d Add `never_warn_on: dict` support to unmasker
Such that key->value pairs can be defined which should *never be*
unmasked where values of
- the keys are exc-types which might be masked, and
- the values are exc-types which masked the equivalent key.

For example, the default includes:
- KBI->taskc: a kbi should never be unmasked from its masking
  `trio.Cancelled`.

For the impl, a new `do_warn: bool` in the fn-body determines the
primary guard for whether a warning or re-raising is necessary.
2025-07-28 12:57:48 -04:00
Tyler Goodlet 7459a4127c Accept `tn` to `gather_contexts()/maybe_open_context()`
Such that the caller can be responsible for their own (nursery) scoping
as needed and, for the latter fn's case with
a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is
a valid parent-ish.

Some deats,
- in `gather_contexts()`, mv the `try/finally` outside the nursery block
  to ensure we always do the `parent_exit`.
- for `maybe_open_context()` we do a naive task-tree hierarchy audit to
  ensure the provided scope is not *too* child-ish (with what APIs `trio`
  gives us, see above), OW go with the old approach of using the actor's
  private service nursery.
  Also,
  * better report `trio.Cancelled` around the cache-miss `yield`
    cases and ensure we **never** unmask triggering key-errors.
  * report on any stale-state with the mutex in the `finally` block.
2025-07-27 13:55:11 -04:00
Tyler Goodlet fc77e6eca5 Suppress beg tbs from `collapse_eg()`
It was originally this way; I forgot to flip it back when discarding the
`except*` handler impl..

Specially handle the `exc.__cause__` case where we raise from any
detected underlying cause and OW `from None` to suppress the eg's tb.
2025-07-25 20:05:51 -04:00
Tyler Goodlet 26526b86c3 Facepalm, actually use `.log.cancel()`-level to report parent-side taskc.. 2025-07-25 19:03:21 -04:00
Tyler Goodlet d079675dd4 UDS: implicitly create `Address.bindspace: Path`
Since it's merely a local-file-sys subdirectory and there should be no
reason file creation conflicts with other bind spaces.

Also add 2 test suites to match,
- `tests/ipc/test_each_tpt::test_uds_bindspace_created_implicitly` to
  verify the dir creation when DNE.
- `..test_uds_double_listen_raises_connerr` to ensure a double bind
  raises a `ConnectionError` from the src `OSError`.
2025-07-25 13:32:23 -04:00
Tyler Goodlet c2acc4f55c Rm `assert` from `Channel.from_addr()`, for UDS we re-created to extract the peer PID 2025-07-25 11:27:30 -04:00
Tyler Goodlet 326b258fd5 Drop `tn` input from `maybe_raise_from_masking_exc()`
Including all caller usage throughout. Moving to a non-`except*` impl
means it's never needed as a signal from the caller - we can just catch
the beg outright (like we should have always been doing)..
2025-07-25 11:16:02 -04:00
Tyler Goodlet 4f4c7e6b67 Adjust test suites to new `maybe_raise_from_masking_exc()` changes 2025-07-25 11:02:22 -04:00
Tyler Goodlet c05d08e426 Pass `tuple` from `._invoke()` unmasker usage
Since `maybe_raise_from_masking_exc()` now requires the general case
instead explicitly pass `unmask_from=(Cancelled,)` (yes i know it's the
current default).

Also add some extra `TransportClosed`-handling for some
IPC-disconnects-during-teardown edge cases,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying chan already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports the exc but raises it through (as prior).
  * I originally thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * Hence the masked-out `debug_filter` / guard expression around the
    `await debug._maybe_enter_pm()` line.
2025-07-25 10:52:06 -04:00
Tyler Goodlet 02062c5dc0 Drop `except*` usage from `._taskc` unmasker
That is from `maybe_raise_from_masking_exc()` thus minimizing us to
a single `except BaseException` block with logic branching for the beg
vs. `unmask_from` exc cases.

Also,
- raise val-err when `unmask_from` is not a `tuple`.
- tweak the exc-note warning format.
- drop all pausing from dev work.
2025-07-25 10:25:33 -04:00
Tyler Goodlet 72c4a9d20b Rework `collapse_eg()` to NOT use `except*`..
Since it turns out the semantics are basically inverse of normal
`except` (particularly for re-raising) which is hard to get right, and
bc it's a lot easier to just delegate to what `trio` already has behind
the `strict_exception_groups=False` setting, Bp

I added a rant here which will get removed shortly likely, but i think
going forward recommending against use of `except*` is prudent for
anything low level enough in the runtime (like trying to filter begs).

Dirty deats,
- copy `trio._core._run.collapse_exception_group()` to here with only
  a slight mod to remove the notes check and tb concatting for the
  collapse case.
- rename `maybe_collapse_eg()` - > `get_collapsed_eg()` and delegate it
  directly to the former `trio` fn; return `None` when it returns the
  same beg without collapse.
- simplify our own `collapse_eg()` to either raise the collapsed `exc`
  or original `beg`.
2025-07-25 10:23:19 -04:00
Tyler Goodlet ccc3b1fce1 `ipc._uds`: assign `.l/raddr` in `.connect_to()`
Using `.get_stream_addrs()` such that we always (*can*) assign the peer
end's PID in the `._raddr`.

Also factor common `ConnectionError` re-raising into
a `_reraise_as_connerr()`-@cm.
2025-07-24 23:16:30 -04:00
Tyler Goodlet 11c4e65757 Add `.trionics.maybe_open_context()` locking test
Call it `test_lock_not_corrupted_on_fast_cancel()` and includes
a detailed doc string to explain. Implemented it "cleverly" by having
the target `@acm` cancel its parent nursery after a peer, cache-hitting
task, is already waiting on the task mutex release.
2025-07-20 15:01:18 -04:00
Tyler Goodlet 33ac3ca99f Always `finally` invoke cache-miss `lock.release()`s
Since the `await service_n.start()` on key-err can be cancel-masked
(checkpoint interrupted before `_Cache.run_ctx` completes), we need to
always `lock.release()` in to avoid lock-owner-state corruption and/or
inf-hangs in peer cache-hitting tasks.

Deats,
- add a `try/except/finally` around the key-err triggered cache-miss
  `service_n.start(_Cache.run_ctx, ..)` call, reporting on any taskc
  and always `finally` unlocking.
- fill out some log msg content and use `.debug()` level.
2025-07-20 14:57:26 -04:00
Tyler Goodlet 9ada628a57 Rename all lingering ctx-side bits
As before but more thoroughly in comments and var names finally changing
all,
- caller -> parent
- callee -> child
2025-07-18 20:07:37 -04:00
Tyler Goodlet d2c3e32bf1 Well then, I guess it just needed, a checkpoint XD
Here I was thinking the bcaster (usage) maybe required a rework but,
NOPE it's just bc a checkpoint was needed in the parent task owning the
`tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated
`an`/portal is eventually cancel-called..

Ah well, at least i started a patch for `MsgStream.subscribe()` to make
it multicast revertible.. XD

Anyway, I tossed in some checks & notes related to all that unnecessary
effort since I do think i'll move forward implementing it:
- for the `cache_hit` case always verify that the `bcast` clone is
  unregistered from the common state subs after
  `.subscribe().__aexit__()`.
- do a light check that the implicit `MsgStream._broadcaster` is always
  the only bcrx instance left-leaked into that state.. that is until
  i get the proper de-allocation/reversion from multicast -> unicast
  working.
- put in mega detailed note about the required parent-task checkpoint.
2025-07-18 00:36:52 -04:00
Tyler Goodlet 51944a0b99 TOSQASH 285ebba: woops still use `bcrx._state` for now.. 2025-07-18 00:36:52 -04:00
Tyler Goodlet 024e8015da Switch nursery to `CancelScope`-status properties
Been meaning to do this forever and a recent test hang finally drove me
to it Bp

Like it sounds, adopt the "cancel-status" properties on `ActorNursery`
use already on our `Context` and derived from `trio.CancelScope`:

- add new private `._cancel_called` (set in the head of `.cancel()`)
  & `._cancelled_caught` (set in the tail) instance vars with matching
  read-only `@properties`.

- drop the instance-var and instead delegate a `.cancelled: bool`
  property to `._cancel_called` and add a usage deprecation warning
  (since removing it breaks a buncha tests).
2025-07-18 00:36:52 -04:00
Tyler Goodlet aaed3a4a37 Add `Channel.closed/.cancel_called`
I.e. the public properties for the private instance var equivs; improves
expected introspection usage.
2025-07-18 00:36:52 -04:00
Tyler Goodlet edffd5e367 Set `Channel._cancel_called` via `chan` var
In `Portal.cancel_actor()` that is, at the least to make it easier to
ref search from an editor Bp
2025-07-18 00:36:52 -04:00
Tyler Goodlet 4ca81e39e6 Never shield-wait `ipc_server.wait_for_no_more_peers()`
As mentioned in prior testing commit, it can cause the worst kind of
hangs, the SIGINT ignoring kind.. Pretty sure there was never any reason
outside some esoteric multi-actor debugging case, and pretty sure that
already was solved?
2025-07-18 00:36:52 -04:00
Tyler Goodlet dd7aca539f Tool-up `test_resource_cache.test_open_local_sub_to_stream`
Since I recently discovered a very subtle race-case that can sometimes
cause the suite to hang, seemingly due to the `an: ActorNursery`
allocated *behind* the `.trionics.maybe_open_context()` usage; this can
result in never cancelling the 'streamer' subactor despite the `main()`
timeout-guard?

This led me to dig in and find that the underlying issue was 2-fold,

- our `BroadcastReceiver` termination-mgmt semantics in
  `MsgStream.subscribe()` can result in the first subscribing task to
  always keep the `MsgStream._broadcaster` instance allocated; it's
  never `.aclose()`ed, which makes it tough to determine (and thus
  trace) when all subscriber-tasks are actually complete and
  exited-from-`.subscribe()`..

- i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in
  `._runtime.async_main()`'s shutdown sequence which would then compound
  the issue resulting in a SIGINT-shielded hang.. the worst kind XD

Actual changes here are just styling, printing, and some mucking with
passing the `an`-ref up to the parent task in the root-actor where i was
doing a conditional `ActorNursery.cancel()` to mk sure that was actually
the problem. Presuming this is fixed the `.pause()` i left unmasked
should never hit.
2025-07-18 00:36:52 -04:00
Tyler Goodlet 735dc9056a Go multi-line-style tuples in `maybe_enter_context()`
Allows for an inline comment of the first "cache hit" bool element.
2025-07-18 00:36:52 -04:00
Tyler Goodlet e949839edf More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 6194ac891c Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-07-18 00:36:29 -04:00
Tyler Goodlet 6554e324f2 Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 076caeb596 Couple more `._root` logging tweaks.. 2025-07-18 00:36:29 -04:00
Tyler Goodlet faa678e209 Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-07-18 00:36:29 -04:00
Tyler Goodlet c5d68f6b58 Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 506aefb917 Use `._supervise._shutdown_msg` in tooling test 2025-07-18 00:36:29 -04:00
Tyler Goodlet 7436d52f37 Use `nest_from_op()`/`pretty_struct` in `._rpc`
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 80b074e3e7 Use `nest_from_op()` in actor-nursery shutdown
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
2025-07-18 00:36:29 -04:00
Tyler Goodlet e97efb7099 Use `Address` where possible in (root) actor boot
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 81b11fd665 Add #TODO for `._context` to use `.msg.Aid` 2025-07-18 00:36:29 -04:00
Tyler Goodlet aa2b1fbf8b Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-07-18 00:36:29 -04:00
Tyler Goodlet 82c12253e5 Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 7f451409ec More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 9be6f6d3e9 Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 9d2c7ae3cf .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-07-18 00:36:29 -04:00
Tyler Goodlet a81a1be40c Drop `actor_info: str` from `._entry` logs 2025-07-18 00:36:29 -04:00
Tyler Goodlet c85575e6ce Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-07-18 00:36:29 -04:00
Tyler Goodlet aa98cbd848 Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-07-18 00:36:29 -04:00
Tyler Goodlet a890e9aa83 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 1592f7e6be Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 1c9293e69d Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-07-18 00:36:29 -04:00
Tyler Goodlet ec13c1b31d Enforce named-args only to `.open_nursery()` 2025-07-18 00:36:29 -04:00
Tyler Goodlet 7ce366097d Facepalm, fix `raise from` in `collapse_eg()`
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.

Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..

This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
2025-07-18 00:36:03 -04:00
Tyler Goodlet 6cedda008a Just import `._runtime` ns in `._root`; be a bit more explicit 2025-07-18 00:36:02 -04:00
Tyler Goodlet 207175d78e Use collapse in `._root.open_root_actor()` too
Seems to add one more cancellation suite failure as well as now cause
the discovery test to error instead of fail?
2025-07-18 00:36:02 -04:00
Tyler Goodlet 57b5e51099 Use collapser around root tn in `.async_main()`
Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
2025-07-18 00:36:02 -04:00
Tyler Goodlet b72c8dce9b Drop msging-err patt from `subactor_breakpoint` ex
Since the `bdb` module was added to the namespace lookup set in
`._exceptions.get_err_type()` we can now relay a RAE-boxed
`bdb.BdbQuit`.
2025-07-18 00:36:02 -04:00
Tyler Goodlet bfa4d71009 Switch to strict-eg nurseries almost everywhere
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
  task.
- the daemon and "run-in-actor" layered tn pair allocated in
  `._supervise._open_and_supervise_one_cancels_all_nursery()`.

The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
2025-07-18 00:36:02 -04:00
Tyler Goodlet 434e22680e Use collapser in rent side of `Context` 2025-07-18 00:36:02 -04:00
Tyler Goodlet 636c19866c Add some tooling params to `collapse_eg()` 2025-07-18 00:35:45 -04:00
32 changed files with 373 additions and 716 deletions

View File

@ -16,7 +16,6 @@ from tractor import (
ContextCancelled,
MsgStream,
_testing,
trionics,
)
import trio
import pytest
@ -63,8 +62,9 @@ async def recv_and_spawn_net_killers(
await ctx.started()
async with (
ctx.open_stream() as stream,
trionics.collapse_eg(),
trio.open_nursery() as tn,
trio.open_nursery(
strict_exception_groups=False,
) as tn,
):
async for i in stream:
print(f'child echoing {i}')

View File

@ -1,35 +0,0 @@
import trio
import tractor
async def main():
async with tractor.open_root_actor(
debug_mode=True,
loglevel='cancel',
) as _root:
# manually trigger self-cancellation and wait
# for it to fully trigger.
_root.cancel_soon()
await _root._cancel_complete.wait()
print('root cancelled')
# now ensure we can still use the REPL
try:
await tractor.pause()
except trio.Cancelled as _taskc:
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
# NOTE^^ above logic but inside `open_root_actor()` and
# passed to the `shield=` expression is effectively what
# we're testing here!
await tractor.pause(shield=root_cs.cancel_called)
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
# crash-handler block this should never be interacted,
# instead `trio.Cancelled` would be bubbled up: the original
# BUG.
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -23,8 +23,9 @@ async def main():
modules=[__name__]
) as portal_map,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
trio.open_nursery(
strict_exception_groups=False,
) as tn,
):
for (name, portal) in portal_map.items():

View File

@ -1,13 +1,13 @@
"""
That "native" debug mode better work!
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually.
TODO:
- none of these tests have been run successfully on windows yet but
there's been manual testing that verified it works.
- wonder if any of it'll work on OS X?
- none of these tests have been run successfully on windows yet but
there's been manual testing that verified it works.
- wonder if any of it'll work on OS X?
"""
from __future__ import annotations
@ -925,7 +925,6 @@ def test_post_mortem_api(
"<Task 'name_error'",
"NameError",
"('child'",
'getattr(doggypants)', # exc-LoC
]
)
if ctlc:
@ -942,8 +941,8 @@ def test_post_mortem_api(
"<Task '__main__.main'",
"('root'",
"NameError",
"tractor.post_mortem()",
"src_uid=('child'",
"tractor.post_mortem()", # in `main()`-LoC
]
)
if ctlc:
@ -961,10 +960,6 @@ def test_post_mortem_api(
"('root'",
"NameError",
"src_uid=('child'",
# raising line in `main()` but from crash-handling
# in `tractor.open_nursery()`.
'async with p.open_context(name_error) as (ctx, first):',
]
)
if ctlc:
@ -1156,54 +1151,6 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
)
def test_crash_handling_within_cancelled_root_actor(
spawn: PexpectSpawner,
):
'''
Ensure that when only a root-actor is started via `open_root_actor()`
we can crash-handle in debug-mode despite self-cancellation.
More-or-less ensures we conditionally shield the pause in
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
call.
'''
child = spawn('root_self_cancelled_w_error')
child.expect(PROMPT)
assert_before(
child,
[
"Actor.cancel_soon()` was called!",
"root cancelled",
_pause_msg,
"('root'", # actor name
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"AssertionError",
"assert 0",
]
)
child.sendline('c')
child.expect(EOF)
assert_before(
child,
[
"AssertionError",
"assert 0",
]
)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead

View File

@ -18,9 +18,8 @@ from tractor import (
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
bs_dir_str: str = '/run/user/1000/doggy'
bs_dir = Path(bs_dir_str)
assert not bs_dir.is_dir()
yield bs_dir_str

View File

@ -313,8 +313,9 @@ async def inf_streamer(
# `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle?
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
trio.open_nursery(
strict_exception_groups=False,
) as tn,
):
async def close_stream_on_sentinel():
async for msg in stream:

View File

@ -236,10 +236,7 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds
with (
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
@ -287,32 +284,20 @@ async def test_cancel_infinite_streamer(start_method):
],
)
@tractor_test
async def test_some_cancels_all(
num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment.
'''
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
try:
async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await an.start_actor(
dactor_portals.append(await n.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
@ -322,7 +307,7 @@ async def test_some_cancels_all(
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await an.run_in_actor(
await n.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
@ -352,8 +337,7 @@ async def test_some_cancels_all(
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as _err:
err = _err
except first_err as err:
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
@ -364,8 +348,8 @@ async def test_some_cancels_all(
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert an.cancelled is True
assert not an._children
assert n.cancelled is True
assert not n._children
else:
pytest.fail("Should have gotten a remote assertion error?")
@ -535,15 +519,10 @@ def test_cancel_via_SIGINT_other_task(
async def main():
# should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout):
async with (
# XXX ?TODO? why no work!?
# tractor.trionics.collapse_eg(),
trio.open_nursery(
strict_exception_groups=False,
) as tn,
):
await tn.start(spawn_and_sleep_forever)
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
await n.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
@ -554,123 +533,38 @@ def test_cancel_via_SIGINT_other_task(
async def spin_for(period=3):
"Sync sleep."
print(f'sync sleeping in sub-sub for {period}\n')
time.sleep(period)
async def spawn_sub_with_sync_blocking_task():
async with tractor.open_nursery() as an:
print('starting sync blocking subactor..\n')
await an.run_in_actor(
async def spawn():
async with tractor.open_nursery() as tn:
await tn.run_in_actor(
spin_for,
name='sleeper',
)
print('exiting first subactor layer..\n')
@pytest.mark.parametrize(
'man_cancel_outer',
[
False, # passes if delay != 2
# always causes an unexpected eg-w-embedded-assert-err?
pytest.param(True,
marks=pytest.mark.xfail(
reason=(
'always causes an unexpected eg-w-embedded-assert-err?'
)
),
),
],
)
@no_windows
def test_cancel_while_childs_child_in_sync_sleep(
loglevel: str,
start_method: str,
spawn_backend: str,
debug_mode: bool,
reg_addr: tuple,
man_cancel_outer: bool,
loglevel,
start_method,
spawn_backend,
):
'''
Verify that a child cancelled while executing sync code is torn
"""Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent
2 nurseries "up".
Though the grandchild should stay blocking its actor runtime, its
parent should issue a "zombie reaper" to hard kill it after
sufficient timeout.
'''
"""
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main():
#
# XXX BIG TODO NOTE XXX
#
# it seems there's a strange race that can happen
# where where the fail-after will trigger outer scope
# .cancel() which then causes the inner scope to raise,
#
# BaseExceptionGroup('Exceptions from Trio nursery', [
# BaseExceptionGroup('Exceptions from Trio nursery',
# [
# Cancelled(),
# Cancelled(),
# ]
# ),
# AssertionError('assert 0')
# ])
#
# WHY THIS DOESN'T MAKE SENSE:
# ---------------------------
# - it should raise too-slow-error when too slow..
# * verified that using simple-cs and manually cancelling
# you get same outcome -> indicates that the fail-after
# can have its TooSlowError overriden!
# |_ to check this it's easy, simplly decrease the timeout
# as per the var below.
#
# - when using the manual simple-cs the outcome is different
# DESPITE the `assert 0` which means regardless of the
# inner scope effectively failing in the same way, the
# bubbling up **is NOT the same**.
#
# delays trigger diff outcomes..
# ---------------------------
# as seen by uncommenting various lines below there is from
# my POV an unexpected outcome due to the delay=2 case.
#
# delay = 1 # no AssertionError in eg, TooSlowError raised.
# delay = 2 # is AssertionError in eg AND no TooSlowError !?
delay = 4 # is AssertionError in eg AND no _cs cancellation.
with trio.fail_after(delay) as _cs:
# with trio.CancelScope() as cs:
# ^XXX^ can be used instead to see same outcome.
async with (
# tractor.trionics.collapse_eg(), # doesn't help
tractor.open_nursery(
hide_tb=False,
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
):
await an.run_in_actor(
spawn_sub_with_sync_blocking_task,
name='sync_blocking_sub',
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
await tn.run_in_actor(
spawn,
name='spawn',
)
await trio.sleep(1)
if man_cancel_outer:
print('Cancelling manually in root')
_cs.cancel()
# trigger exc-srced taskc down
# the actor tree.
print('RAISING IN ROOT')
assert 0
with pytest.raises(AssertionError):

View File

@ -117,10 +117,9 @@ async def open_actor_local_nursery(
ctx: tractor.Context,
):
global _nursery
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
_nursery = tn
await ctx.started()
await trio.sleep(10)

View File

@ -13,24 +13,26 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(3):
with trio.fail_after(1):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='cancel',
debug_mode=False,
loglevel='info',
# debug_mode=True,
) as portals,
gather_contexts(mngrs=()),
gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
):
# should fail before this?
assert portals
# test should fail if we mk it here!
assert 0, 'Should have raised val-err !?'
assert 0
with pytest.raises(ValueError):
trio.run(main)

View File

@ -11,7 +11,6 @@ import psutil
import pytest
import subprocess
import tractor
from tractor.trionics import collapse_eg
from tractor._testing import tractor_test
import trio
@ -194,10 +193,10 @@ async def spawn_and_check_registry(
try:
async with tractor.open_nursery() as an:
async with (
collapse_eg(),
trio.open_nursery() as trion,
):
async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
portals = {}
for i in range(3):
name = f'a{i}'
@ -339,12 +338,11 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from(
stream_forever
) as agen2:
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(streamer, agen1)
tn.start_soon(cancel, use_signal, .5)
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:

View File

@ -234,8 +234,10 @@ async def trio_ctx(
with trio.fail_after(1 + delay):
try:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
trio.open_nursery(
# TODO, for new `trio` / py3.13
# strict_exception_groups=False,
) as tn,
tractor.to_asyncio.open_channel_from(
sleep_and_err,
) as (first, chan),

View File

@ -235,16 +235,10 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module')
def time_quad_ex(
reg_addr: tuple,
ci_env: bool,
spawn_backend: str,
):
def time_quad_ex(reg_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
'''
no idea but the mp *nix runs are flaking out here often...
'''
"""no idea but the mp *nix runs are flaking out here often...
"""
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@ -255,24 +249,12 @@ def time_quad_ex(
return results, diff
def test_a_quadruple_example(
time_quad_ex: tuple,
ci_env: bool,
spawn_backend: str,
):
'''
This also serves as a kind of "we'd like to be this fast test".
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
"""This also serves as a kind of "we'd like to be this fast test"."""
'''
results, diff = time_quad_ex
assert results
this_fast = (
6 if platform.system() in (
'Windows',
'Darwin',
)
else 3
)
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
assert diff < this_fast

View File

@ -147,7 +147,8 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever()
async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999):
# with trio.fail_after(2):
with trio.fail_after(999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
@ -216,25 +217,32 @@ def test_trio_prestarted_task_bubbles(
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
patt: str = 'trio-side'
expect_exc = TypeError
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
patt: str = 'asyncio-side'
expect_exc = RuntimeError
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]

View File

@ -8,7 +8,6 @@ from contextlib import (
)
import pytest
from tractor.trionics import collapse_eg
import trio
from trio import TaskStatus
@ -65,8 +64,9 @@ def test_stashed_child_nursery(use_start_soon):
async def main():
async with (
collapse_eg(),
trio.open_nursery() as pn,
trio.open_nursery(
strict_exception_groups=False,
) as pn,
):
cn = await pn.start(mk_child_nursery)
assert cn
@ -117,11 +117,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
async with (
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc(
tn=tn,
unmask_from=(
trio.Cancelled
if unmask_from_canc
else None
(trio.Cancelled,) if unmask_from_canc
else ()
),
)
):
@ -136,8 +134,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
if bxerr:
assert not bxerr.value
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,
@ -145,11 +142,12 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert not tn.cancel_scope.cancel_called
assert 0
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
if debug_mode:
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
with pytest.raises(ExceptionGroup) as excinfo:
trio.run(_main)
@ -197,8 +195,10 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
async with (
# XXX should ensure ONLY the KBI
# is relayed upward
collapse_eg(),
trio.open_nursery(), # as tn,
trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
), # as tn,
trionics.gather_contexts([
open_memchan(),

View File

@ -55,17 +55,10 @@ async def open_actor_cluster(
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with (
# tractor.trionics.collapse_eg(),
tractor.open_nursery(
**runtime_kwargs,
) as an
):
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc()
):
async with tractor.open_nursery(
**runtime_kwargs,
) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid
async def _start(name: str) -> None:
@ -76,8 +69,9 @@ async def open_actor_cluster(
)
for name in names:
tn.start_soon(_start, name)
n.start_soon(_start, name)
assert len(portals) == count
yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -1011,6 +1011,7 @@ class Context:
else:
log.cancel(
f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}'
)
@ -1491,12 +1492,6 @@ class Context:
):
status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition
case (
Unresolved,
@ -2278,7 +2273,7 @@ async def open_context_from_portal(
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
)
if debug_mode():

View File

@ -481,11 +481,10 @@ async def open_root_actor(
collapse_eg(),
trio.open_nursery() as root_tn,
# ?TODO? finally-footgun below?
# XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
@ -524,11 +523,6 @@ async def open_root_actor(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
)
if (
@ -568,7 +562,6 @@ async def open_root_actor(
f'{op_nested_actor_repr}'
)
# XXX, THIS IS A *finally-footgun*!
# (also mentioned in with-block above)
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?

View File

@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
try:
yield # run RPC invoke body
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis
)
)
# TODO? better then `debug_filter` below?
# and
# not isinstance(err, TransportClosed)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
if _state.debug_mode():
log.exception(
f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
)
if not entered_debug:
# if we prolly should have entered the REPL but
@ -384,7 +403,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat
# `Actor._service_tn`, we add "handles" to each such that
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
@ -450,7 +469,7 @@ async def _invoke(
kwargs: dict[str, Any],
is_rpc: bool = True,
hide_tb: bool = True,
hide_tb: bool = False,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
@ -462,7 +481,7 @@ async def _invoke(
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_tn: Nursery`.
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__: bool = hide_tb
@ -642,7 +661,7 @@ async def _invoke(
tn: Nursery
rpc_ctx_cs: CancelScope
async with (
collapse_eg(hide_tb=False),
collapse_eg(),
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
@ -654,8 +673,7 @@ async def _invoke(
# scope ensures unasking of the `await coro` below
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=Cancelled,
unmask_from=(Cancelled,),
) as _mbme, # maybe boxed masked exc
):
ctx._scope_nursery = tn
@ -675,7 +693,20 @@ async def _invoke(
f'\n'
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
try:
await chan.send(return_msg)
except TransportClosed:
log.exception(
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
f'\n'
f'{chan}\n'
f'Channel already disconnected ??\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}'
)
# ?TODO? will this ever be true though?
if chan.connected():
raise
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -823,44 +854,24 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n'
)
if merr:
logmeth: Callable = log.error
if (
# ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
# out-of-layer cancellation, one of:
# - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
if not isinstance(merr, RemoteActorError):
tb_str: str = ''.join(traceback.format_exception(merr))
descr_str += (
f'\n{merr!r}\n' # needed?
f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
)
else:
descr_str += (
f'{merr!r}\n'
)
descr_str += f'\n{merr!r}\n'
else:
descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
)
descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth(
f'{message}\n'
@ -936,7 +947,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_tn: Nursery`.
`trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`)
@ -961,7 +972,7 @@ async def process_messages(
'''
actor: Actor = _state.current_actor()
assert actor._service_tn # runtime state sanity
assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
@ -1172,7 +1183,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n'
log.runtime(start_status)
try:
ctx: Context = await actor._service_tn.start(
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
@ -1312,7 +1323,7 @@ async def process_messages(
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_tn
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'

View File

@ -35,15 +35,6 @@ for running all lower level spawning, supervision and msging layers:
SC-transitive RPC via scheduling of `trio` tasks.
- registration of newly spawned actors with the discovery sys.
Glossary:
--------
- tn: a `trio.Nursery` or "task nursery".
- an: an `ActorNursery` or "actor nursery".
- root: top/parent-most scope/task/process/actor (or other runtime
primitive) in a hierarchical tree.
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
- child-ish: "lower-down" in the runtime-primitive hierarchy.
'''
from __future__ import annotations
from contextlib import (
@ -85,7 +76,6 @@ from tractor.msg import (
)
from .trionics import (
collapse_eg,
maybe_open_nursery,
)
from .ipc import (
Channel,
@ -183,11 +173,10 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`,
# - after fork for subactors.
# - during boot for the root actor.
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
# nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery|None = None
_service_n: Nursery|None = None
_ipc_server: _server.IPCServer|None = None
@property
@ -1021,48 +1010,12 @@ class Actor:
the RPC service nursery.
'''
actor_repr: str = _pformat.nest_from_op(
input_op='>c(',
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
assert self._service_n
self._service_n.start_soon(
self.cancel,
None, # self cancel all rpc tasks
)
# schedule a "canceller task" in the `._root_tn` once the
# `._service_tn` is fully shutdown; task waits for child-ish
# scopes to fully exit then finally cancels its parent,
# root-most, scope.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@ -1167,8 +1120,8 @@ class Actor:
await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently
if self._service_tn:
self._service_tn.cancel_scope.cancel()
if self._service_n:
self._service_n.cancel_scope.cancel()
log_meth(msg)
self._cancel_complete.set()
@ -1305,7 +1258,7 @@ class Actor:
'''
Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside
`._service_tn`) when `parent_chan=None`.
`._service_n`) when `parent_chan=None`.
'''
tasks: dict = self._rpc_tasks
@ -1517,55 +1470,46 @@ async def async_main(
accept_addrs.append(addr.unwrap())
assert accept_addrs
ya_root_tn: bool = bool(actor._root_tn)
ya_service_tn: bool = bool(actor._service_tn)
# NOTE, a top-most "root" nursery in each actor-process
# enables a lifetime priority for the IPC-channel connection
# with a sub-actor's immediate parent. I.e. this connection
# is kept alive as a resilient service connection until all
# other machinery has exited, cancellation of all
# embedded/child scopes have completed. This helps ensure
# a deterministic (and thus "graceful")
# first-class-supervision style teardown where a parent actor
# (vs. say peers) is always the last to be contacted before
# disconnect.
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
root_tn: trio.Nursery
async with (
collapse_eg(),
maybe_open_nursery(
nursery=actor._root_tn,
) as root_tn,
trio.open_nursery() as root_tn,
):
if ya_root_tn:
assert root_tn is actor._root_tn
else:
actor._root_tn = root_tn
actor._root_n = root_tn
assert actor._root_n
ipc_server: _server.IPCServer
async with (
collapse_eg(),
maybe_open_nursery(
nursery=actor._service_tn,
) as service_tn,
trio.open_nursery() as service_nursery,
_server.open_ipc_server(
parent_tn=service_tn, # ?TODO, why can't this be the root-tn
stream_handler_tn=service_tn,
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
):
if ya_service_tn:
assert service_tn is actor._service_tn
else:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_tn = service_tn
# set after allocate
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_n = service_nursery
actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
@ -1591,11 +1535,10 @@ async def async_main(
# - root actor: the ``accept_addr`` passed to this method
# TODO: why is this not with the root nursery?
# - see above that the `._service_tn` is what's used?
try:
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_tn,
stream_handler_nursery=service_nursery,
)
log.runtime(
f'Booted IPC server\n'
@ -1603,7 +1546,7 @@ async def async_main(
)
assert (
(eps[0].listen_tn)
is not service_tn
is not service_nursery
)
except OSError as oserr:
@ -1765,7 +1708,7 @@ async def async_main(
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_tn to spawn the lock task, BUT, in theory if we had
# _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?

View File

@ -236,6 +236,10 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -297,23 +301,6 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art?
log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -643,9 +643,8 @@ _shutdown_msg: str = (
'Actor-runtime-shutdown'
)
@acm
# @api_frame
@acm
async def open_nursery(
*, # named params only!
hide_tb: bool = True,

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try:
import stackscope
except ImportError:
log.warning(
'The `stackscope` lib is not installed!\n'
'`Ignoring enable_stack_on_sig() call!\n'
log.error(
'`stackscope` not installed for use in debug mode!\n'
'`Ignoring {enable_stack_on_sig!r} call!\n'
)
return None

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*,
tb: TracebackType|None = None,
api_frame: FrameType|None = None,
hide_tb: bool = True,
hide_tb: bool = False,
# only enter debugger REPL when returns `True`
debug_filter: Callable[

View File

@ -58,7 +58,6 @@ from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
NoRuntime,
InternalError,
)
from tractor._state import (
current_actor,
@ -80,9 +79,6 @@ from ._sigint import (
sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header
)
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING:
from trio.lowlevel import Task
@ -481,12 +477,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_tn.cancel_scope.shield = shield
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_tn` so that the
# subactor's global `Actor._service_n` so that the
# lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their
# application code and when they finally exit the
@ -510,7 +506,7 @@ async def _pause(
f'|_{task}\n'
)
with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_tn.start(
req_ctx: Context = await actor._service_n.start(
partial(
request_root_stdio_lock,
actor_uid=actor.uid,
@ -544,7 +540,7 @@ async def _pause(
_repl_fail_report = None
# when the actor is mid-runtime cancellation the
# `Actor._service_tn` might get closed before we can spawn
# `Actor._service_n` might get closed before we can spawn
# the request task, so just ignore expected RTE.
elif (
isinstance(pause_err, RuntimeError)
@ -989,7 +985,7 @@ def pause_from_sync(
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial(
actor._service_tn.start,
actor._service_n.start,
partial(
_pause_from_bg_root_thread,
behalf_of_thread=thread,
@ -1157,10 +1153,9 @@ def pause_from_sync(
'use_greenback',
False,
):
raise InternalError(
f'`greenback` was never initialized in this actor?\n'
f'\n'
f'{ppfmt(_state._runtime_vars)}\n'
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise

View File

@ -17,59 +17,36 @@
Utils to tame mp non-SC madeness
'''
import platform
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker():
'''
Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing.shared_memory import SharedMemory
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
# 3.13+ only.. can pass `track=False` to disable
# all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
def unregister(self, name, rtype):
pass
# !TODO, once we drop 3.12- we can obvi remove all this!
else:
from multiprocessing import (
resource_tracker as mantracker,
)
def ensure_running(self):
pass
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd

View File

@ -1001,11 +1001,7 @@ class Server(Struct):
partial(
_serve_ipc_eps,
server=self,
stream_handler_tn=(
stream_handler_nursery
or
self._stream_handler_tn
),
stream_handler_tn=stream_handler_nursery,
listen_addrs=accept_addrs,
)
)
@ -1149,17 +1145,13 @@ async def open_ipc_server(
async with maybe_open_nursery(
nursery=parent_tn,
) as parent_tn:
) as rent_tn:
no_more_peers = trio.Event()
no_more_peers.set()
ipc_server = IPCServer(
_parent_tn=parent_tn,
_stream_handler_tn=(
stream_handler_tn
or
parent_tn
),
_parent_tn=rent_tn,
_stream_handler_tn=stream_handler_tn or rent_tn,
_no_more_peers=no_more_peers,
)
try:

View File

@ -23,15 +23,14 @@ considered optional within the context of this runtime-library.
"""
from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder
import time
from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import (
Struct,
@ -62,7 +61,7 @@ except ImportError:
log = get_logger(__name__)
SharedMemory = disable_mantracker()
disable_mantracker()
class SharedInt:
@ -798,15 +797,8 @@ def open_shm_list(
# "close" attached shm on actor teardown
try:
actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink)
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,25 +430,20 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
) as _re:
trans_err = _re
) as bre:
trans_err = bre
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
# XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
tpt_closed = TransportClosed.from_src_exc(
raise TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
@ -456,31 +451,14 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
) from bre
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
f'{tpt_name} layer failed pre-send ??\n'
'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -525,7 +503,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_peers: 1\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'

View File

@ -215,7 +215,7 @@ class LinkedTaskChannel(
val: Any = None,
) -> None:
'''
Synchronize aio-side with its trio-parent.
Synchronize aio-sde with its trio-parent.
'''
self._aio_started_val = val
@ -459,22 +459,14 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n'
)
# XXX ALWAYS close the child-`asyncio`-task-side's
# `to_trio` handle which will in turn relay
# a `trio.EndOfChannel` to the `trio`-parent.
# Consequently the parent `trio` task MUST ALWAYS
# check for any `chan._aio_err` to be raised when it
# receives an EoC.
#
# NOTE, there are 2 EoC cases,
# - normal/graceful EoC due to the aio-side actually
# terminating its "streaming", but the task did not
# error and is not yet complete.
#
# - the aio-task terminated and we specially mark the
# closure as due to the `asyncio.Task`'s exit.
# only close the aio (child) side which will relay
# a `trio.EndOfChannel` to the trio (parent) side.
#
# XXX NOTE, that trio-side MUST then in such cases
# check for a `chan._aio_err` and raise it!!
to_trio.close()
# specially mark the closure as due to the
# asyncio.Task terminating!
chan._closed_by_aio_task = True
aio_task_complete.set()
@ -854,6 +846,8 @@ async def translate_aio_errors(
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc
#
# await tractor.pause(shield=True)
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying

View File

@ -78,6 +78,7 @@ def collapse_exception_group(
def get_collapsed_eg(
beg: BaseExceptionGroup,
bp: bool = False,
) -> BaseException|None:
'''
If the input beg can collapse to a single sub-exception which is
@ -91,6 +92,7 @@ def get_collapsed_eg(
return maybe_exc
@acm
async def collapse_eg(
hide_tb: bool = True,
@ -100,8 +102,6 @@ async def collapse_eg(
# trio.Cancelled,
},
add_notes: bool = True,
bp: bool = False,
):
'''
If `BaseExceptionGroup` raised in the body scope is
@ -115,20 +115,6 @@ async def collapse_eg(
yield
except BaseExceptionGroup as _beg:
beg = _beg
if (
bp
and
len(beg.exceptions) > 1
):
import tractor
if tractor.current_actor(
err_on_no_runtime=False,
):
await tractor.pause(shield=True)
else:
breakpoint()
if (
(exc := get_collapsed_eg(beg))
and

View File

@ -204,7 +204,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_tn: Optional[trio.Nursery] = None
service_n: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -294,15 +294,15 @@ async def maybe_open_context(
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_tn = tn
service_n = tn
else:
service_tn: trio.Nursery = current_actor()._service_tn
service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery?
# service_tn: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn:
# _Cache.service_tn = service_tn
# service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
@ -324,8 +324,8 @@ async def maybe_open_context(
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_tn, trio.Event())
yielded: Any = await service_tn.start(
resources[ctx_key] = (service_n, trio.Event())
yielded: Any = await service_n.start(
_Cache.run_ctx,
mngr,
ctx_key,

View File

@ -22,7 +22,10 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import TYPE_CHECKING
from typing import (
Type,
TYPE_CHECKING,
)
import trio
from tractor.log import get_logger
@ -65,7 +68,6 @@ def find_masked_excs(
#
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: (
BaseException|
tuple[BaseException]
@ -74,15 +76,26 @@ async def maybe_raise_from_masking_exc(
raise_unmasked: bool = True,
extra_note: str = (
'This can occurr when,\n'
' - a `trio.Nursery` scope embeds a `finally:`-block '
'which executes a checkpoint!'
'\n'
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
'which execs an un-shielded checkpoint!'
#
# ^TODO? other cases?
),
always_warn_on: tuple[BaseException] = (
always_warn_on: tuple[Type[BaseException]] = (
trio.Cancelled,
),
# don't ever unmask or warn on any masking pair,
# {<masked-excT-key> -> <masking-excT-value>}
never_warn_on: dict[
Type[BaseException],
Type[BaseException],
] = {
KeyboardInterrupt: trio.Cancelled,
trio.Cancelled: trio.Cancelled,
},
# ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc
# is always expected to be consumed.
@ -104,81 +117,91 @@ async def maybe_raise_from_masking_exc(
individual sub-excs but maintain the eg-parent's form right?
'''
if not isinstance(unmask_from, tuple):
raise ValueError(
f'Invalid unmask_from = {unmask_from!r}\n'
f'Must be a `tuple[Type[BaseException]]`.\n'
)
from tractor.devx.debug import (
BoxedMaybeException,
pause,
)
boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked,
)
matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs
yield boxed_maybe_exc
return
except* unmask_from as _maybe_eg:
maybe_eg = _maybe_eg
try:
yield boxed_maybe_exc
return
except BaseException as _bexc:
bexc = _bexc
if isinstance(bexc, BaseExceptionGroup):
matches: ExceptionGroup
matches, _ = maybe_eg.split(
unmask_from
)
if not matches:
raise
matches, _ = bexc.split(unmask_from)
if matches:
matching = matches.exceptions
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
elif (
unmask_from
and
type(bexc) in unmask_from
):
matching = [bexc]
if matching is None:
raise
masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching:
if exc_ctx := find_masked_excs(
maybe_masker=exc_match,
unmask_from={unmask_from},
unmask_from=set(unmask_from),
):
masked.append((exc_ctx, exc_match))
masked.append((
exc_ctx,
exc_match,
))
boxed_maybe_exc.value = exc_match
note: str = (
f'\n'
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
f'^^WARNING^^\n'
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
)
if extra_note:
note += (
f'\n'
f'{extra_note}\n'
)
exc_ctx.add_note(note)
if type(exc_match) in always_warn_on:
do_warn: bool = (
never_warn_on.get(
type(exc_ctx) # masking type
)
is not
type(exc_match) # masked type
)
if do_warn:
exc_ctx.add_note(note)
if (
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note)
# await tractor.pause(shield=True)
if raise_unmasked:
if (
do_warn
and
raise_unmasked
):
if len(masked) < 2:
raise exc_ctx from exc_match
else:
# ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
await pause(shield=True)
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else:
raise