Compare commits

..

58 Commits

Author SHA1 Message Date
Tyler Goodlet da9bc1237d Change one infected-aio test to use `chan` in fn sig 2025-07-29 14:47:24 -04:00
Tyler Goodlet ab11ee4fbe Support `chan.started_nowait()` in `.open_channel_from()` target
That is the `target` can declare a `chan: LinkedTaskChannel` instead of
`to_trio`/`from_aio`.

To support it,
- change `.started()` -> the more appropriate `.started_nowait()` which
  can be called sync from the aio child task.
- adjust the `provide_channels` assert to accept either fn sig
  declaration (for now).

Still needs test(s) obvi..
2025-07-29 14:42:15 -04:00
Tyler Goodlet 466dce8aed Relay `asyncio` errors via EoC and raise from rent
Makes the newly added `test_aio_side_raises_before_started` test pass by
ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned
child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by
checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`.

Impl deats,
- obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False`
- in `translate_aio_errors()` always check for the new flag on EOC
  conditions and in such cases set `chan._trio_to_raise = aio_err` such
  that the `trio`-parent-task always raises the child's exception
  directly, OW keep original EoC passthrough in place.
- include *very* detailed per-case comments around the extended handler.
- adjust re-raising logic with a new `raise_from` where we only give the
  `aio_err` priority if it's not already set as to `trio_to_raise`.

Also,
- hide the `_run_asyncio_task()` frame by def.
2025-07-29 14:30:42 -04:00
Tyler Goodlet 808dd9d73c Add "raises-pre-started" `open_channel_from()` test
Verifying that if any exc is raised pre `chan.send_nowait()` (our
currentlly shite version of a `chan.started()`) then that exc is indeed
raised through on the `trio`-parent task side. This case was reproduced
from a `piker.brokers.ib` issue with a similar embedded
`.trionics.maybe_open_context()` call.

Deats,
- call the suite `test_aio_side_raises_before_started`.
- mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)`
  with a `target=raise_before_started` which,
- simply sleeps then immediately raises a RTE.
- expect the RTE from the aio-child-side to propagate all the way up to
  the root-actor's task right up through the `trio.run()`.
2025-07-29 13:58:48 -04:00
Tyler Goodlet aef306465d Add `never_warn_on: dict` support to unmasker
Such that key->value pairs can be defined which should *never be*
unmasked where values of
- the keys are exc-types which might be masked, and
- the values are exc-types which masked the equivalent key.

For example, the default includes:
- KBI->taskc: a kbi should never be unmasked from its masking
  `trio.Cancelled`.

For the impl, a new `do_warn: bool` in the fn-body determines the
primary guard for whether a warning or re-raising is necessary.
2025-07-28 12:57:48 -04:00
Tyler Goodlet 7459a4127c Accept `tn` to `gather_contexts()/maybe_open_context()`
Such that the caller can be responsible for their own (nursery) scoping
as needed and, for the latter fn's case with
a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is
a valid parent-ish.

Some deats,
- in `gather_contexts()`, mv the `try/finally` outside the nursery block
  to ensure we always do the `parent_exit`.
- for `maybe_open_context()` we do a naive task-tree hierarchy audit to
  ensure the provided scope is not *too* child-ish (with what APIs `trio`
  gives us, see above), OW go with the old approach of using the actor's
  private service nursery.
  Also,
  * better report `trio.Cancelled` around the cache-miss `yield`
    cases and ensure we **never** unmask triggering key-errors.
  * report on any stale-state with the mutex in the `finally` block.
2025-07-27 13:55:11 -04:00
Tyler Goodlet fc77e6eca5 Suppress beg tbs from `collapse_eg()`
It was originally this way; I forgot to flip it back when discarding the
`except*` handler impl..

Specially handle the `exc.__cause__` case where we raise from any
detected underlying cause and OW `from None` to suppress the eg's tb.
2025-07-25 20:05:51 -04:00
Tyler Goodlet 26526b86c3 Facepalm, actually use `.log.cancel()`-level to report parent-side taskc.. 2025-07-25 19:03:21 -04:00
Tyler Goodlet d079675dd4 UDS: implicitly create `Address.bindspace: Path`
Since it's merely a local-file-sys subdirectory and there should be no
reason file creation conflicts with other bind spaces.

Also add 2 test suites to match,
- `tests/ipc/test_each_tpt::test_uds_bindspace_created_implicitly` to
  verify the dir creation when DNE.
- `..test_uds_double_listen_raises_connerr` to ensure a double bind
  raises a `ConnectionError` from the src `OSError`.
2025-07-25 13:32:23 -04:00
Tyler Goodlet c2acc4f55c Rm `assert` from `Channel.from_addr()`, for UDS we re-created to extract the peer PID 2025-07-25 11:27:30 -04:00
Tyler Goodlet 326b258fd5 Drop `tn` input from `maybe_raise_from_masking_exc()`
Including all caller usage throughout. Moving to a non-`except*` impl
means it's never needed as a signal from the caller - we can just catch
the beg outright (like we should have always been doing)..
2025-07-25 11:16:02 -04:00
Tyler Goodlet 4f4c7e6b67 Adjust test suites to new `maybe_raise_from_masking_exc()` changes 2025-07-25 11:02:22 -04:00
Tyler Goodlet c05d08e426 Pass `tuple` from `._invoke()` unmasker usage
Since `maybe_raise_from_masking_exc()` now requires the general case
instead explicitly pass `unmask_from=(Cancelled,)` (yes i know it's the
current default).

Also add some extra `TransportClosed`-handling for some
IPC-disconnects-during-teardown edge cases,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying chan already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports the exc but raises it through (as prior).
  * I originally thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * Hence the masked-out `debug_filter` / guard expression around the
    `await debug._maybe_enter_pm()` line.
2025-07-25 10:52:06 -04:00
Tyler Goodlet 02062c5dc0 Drop `except*` usage from `._taskc` unmasker
That is from `maybe_raise_from_masking_exc()` thus minimizing us to
a single `except BaseException` block with logic branching for the beg
vs. `unmask_from` exc cases.

Also,
- raise val-err when `unmask_from` is not a `tuple`.
- tweak the exc-note warning format.
- drop all pausing from dev work.
2025-07-25 10:25:33 -04:00
Tyler Goodlet 72c4a9d20b Rework `collapse_eg()` to NOT use `except*`..
Since it turns out the semantics are basically inverse of normal
`except` (particularly for re-raising) which is hard to get right, and
bc it's a lot easier to just delegate to what `trio` already has behind
the `strict_exception_groups=False` setting, Bp

I added a rant here which will get removed shortly likely, but i think
going forward recommending against use of `except*` is prudent for
anything low level enough in the runtime (like trying to filter begs).

Dirty deats,
- copy `trio._core._run.collapse_exception_group()` to here with only
  a slight mod to remove the notes check and tb concatting for the
  collapse case.
- rename `maybe_collapse_eg()` - > `get_collapsed_eg()` and delegate it
  directly to the former `trio` fn; return `None` when it returns the
  same beg without collapse.
- simplify our own `collapse_eg()` to either raise the collapsed `exc`
  or original `beg`.
2025-07-25 10:23:19 -04:00
Tyler Goodlet ccc3b1fce1 `ipc._uds`: assign `.l/raddr` in `.connect_to()`
Using `.get_stream_addrs()` such that we always (*can*) assign the peer
end's PID in the `._raddr`.

Also factor common `ConnectionError` re-raising into
a `_reraise_as_connerr()`-@cm.
2025-07-24 23:16:30 -04:00
Tyler Goodlet 11c4e65757 Add `.trionics.maybe_open_context()` locking test
Call it `test_lock_not_corrupted_on_fast_cancel()` and includes
a detailed doc string to explain. Implemented it "cleverly" by having
the target `@acm` cancel its parent nursery after a peer, cache-hitting
task, is already waiting on the task mutex release.
2025-07-20 15:01:18 -04:00
Tyler Goodlet 33ac3ca99f Always `finally` invoke cache-miss `lock.release()`s
Since the `await service_n.start()` on key-err can be cancel-masked
(checkpoint interrupted before `_Cache.run_ctx` completes), we need to
always `lock.release()` in to avoid lock-owner-state corruption and/or
inf-hangs in peer cache-hitting tasks.

Deats,
- add a `try/except/finally` around the key-err triggered cache-miss
  `service_n.start(_Cache.run_ctx, ..)` call, reporting on any taskc
  and always `finally` unlocking.
- fill out some log msg content and use `.debug()` level.
2025-07-20 14:57:26 -04:00
Tyler Goodlet 9ada628a57 Rename all lingering ctx-side bits
As before but more thoroughly in comments and var names finally changing
all,
- caller -> parent
- callee -> child
2025-07-18 20:07:37 -04:00
Tyler Goodlet d2c3e32bf1 Well then, I guess it just needed, a checkpoint XD
Here I was thinking the bcaster (usage) maybe required a rework but,
NOPE it's just bc a checkpoint was needed in the parent task owning the
`tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated
`an`/portal is eventually cancel-called..

Ah well, at least i started a patch for `MsgStream.subscribe()` to make
it multicast revertible.. XD

Anyway, I tossed in some checks & notes related to all that unnecessary
effort since I do think i'll move forward implementing it:
- for the `cache_hit` case always verify that the `bcast` clone is
  unregistered from the common state subs after
  `.subscribe().__aexit__()`.
- do a light check that the implicit `MsgStream._broadcaster` is always
  the only bcrx instance left-leaked into that state.. that is until
  i get the proper de-allocation/reversion from multicast -> unicast
  working.
- put in mega detailed note about the required parent-task checkpoint.
2025-07-18 00:36:52 -04:00
Tyler Goodlet 51944a0b99 TOSQASH 285ebba: woops still use `bcrx._state` for now.. 2025-07-18 00:36:52 -04:00
Tyler Goodlet 024e8015da Switch nursery to `CancelScope`-status properties
Been meaning to do this forever and a recent test hang finally drove me
to it Bp

Like it sounds, adopt the "cancel-status" properties on `ActorNursery`
use already on our `Context` and derived from `trio.CancelScope`:

- add new private `._cancel_called` (set in the head of `.cancel()`)
  & `._cancelled_caught` (set in the tail) instance vars with matching
  read-only `@properties`.

- drop the instance-var and instead delegate a `.cancelled: bool`
  property to `._cancel_called` and add a usage deprecation warning
  (since removing it breaks a buncha tests).
2025-07-18 00:36:52 -04:00
Tyler Goodlet aaed3a4a37 Add `Channel.closed/.cancel_called`
I.e. the public properties for the private instance var equivs; improves
expected introspection usage.
2025-07-18 00:36:52 -04:00
Tyler Goodlet edffd5e367 Set `Channel._cancel_called` via `chan` var
In `Portal.cancel_actor()` that is, at the least to make it easier to
ref search from an editor Bp
2025-07-18 00:36:52 -04:00
Tyler Goodlet 4ca81e39e6 Never shield-wait `ipc_server.wait_for_no_more_peers()`
As mentioned in prior testing commit, it can cause the worst kind of
hangs, the SIGINT ignoring kind.. Pretty sure there was never any reason
outside some esoteric multi-actor debugging case, and pretty sure that
already was solved?
2025-07-18 00:36:52 -04:00
Tyler Goodlet dd7aca539f Tool-up `test_resource_cache.test_open_local_sub_to_stream`
Since I recently discovered a very subtle race-case that can sometimes
cause the suite to hang, seemingly due to the `an: ActorNursery`
allocated *behind* the `.trionics.maybe_open_context()` usage; this can
result in never cancelling the 'streamer' subactor despite the `main()`
timeout-guard?

This led me to dig in and find that the underlying issue was 2-fold,

- our `BroadcastReceiver` termination-mgmt semantics in
  `MsgStream.subscribe()` can result in the first subscribing task to
  always keep the `MsgStream._broadcaster` instance allocated; it's
  never `.aclose()`ed, which makes it tough to determine (and thus
  trace) when all subscriber-tasks are actually complete and
  exited-from-`.subscribe()`..

- i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in
  `._runtime.async_main()`'s shutdown sequence which would then compound
  the issue resulting in a SIGINT-shielded hang.. the worst kind XD

Actual changes here are just styling, printing, and some mucking with
passing the `an`-ref up to the parent task in the root-actor where i was
doing a conditional `ActorNursery.cancel()` to mk sure that was actually
the problem. Presuming this is fixed the `.pause()` i left unmasked
should never hit.
2025-07-18 00:36:52 -04:00
Tyler Goodlet 735dc9056a Go multi-line-style tuples in `maybe_enter_context()`
Allows for an inline comment of the first "cache hit" bool element.
2025-07-18 00:36:52 -04:00
Tyler Goodlet e949839edf More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 6194ac891c Add `.ipc._shm` todo-idea for `@actor_fixture` API 2025-07-18 00:36:29 -04:00
Tyler Goodlet 6554e324f2 Update buncha log msg fmting in `.msg._ops`
Mostly just multi-line code styling again: always putting standalone
`'f\n'` on separate LOC so it reads like it renders to console. Oh and
and a level drop to `.runtime()` for rx-msg reports.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 076caeb596 Couple more `._root` logging tweaks.. 2025-07-18 00:36:29 -04:00
Tyler Goodlet faa678e209 Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and
 converting to multi-line code style an ' for str-report-contents. Tweak
 some imports to sub-mod level as well.
2025-07-18 00:36:29 -04:00
Tyler Goodlet c5d68f6b58 Update buncha log msg fmting in `._portal`
Namely to use `Channel.aid.reprol()` and converting to our newer style
multi-line code style for str-reports.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 506aefb917 Use `._supervise._shutdown_msg` in tooling test 2025-07-18 00:36:29 -04:00
Tyler Goodlet 7436d52f37 Use `nest_from_op()`/`pretty_struct` in `._rpc`
Again for nicer console logging. Also fix a double `req_chan` arg bug
when passed to `_invoke` in the `self.cancel()` rt-ep; don't update the
`kwargs: dict` just merge in `req_chan` input at call time.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 80b074e3e7 Use `nest_from_op()` in actor-nursery shutdown
Including a new one-line `_shutdown_msg: str` which we mod-var-set for
testing usage and some denoising at `.info()` level. Adjust `Actor()`
instantiating input to the new `.registry_addrs` wrapped addrs property.
2025-07-18 00:36:29 -04:00
Tyler Goodlet e97efb7099 Use `Address` where possible in (root) actor boot
Namely inside various bootup-sequences in `._root` and `._runtime`
particularly in the root actor to support both better tpt-address
denoting in our logging and as part of clarifying logic around setting
the root's registry addresses which is soon to be much better factored
out of the core and into an explicit subsystem + API.

Some `_root.open_root_actor()` deats,
- set `registry_addrs` to a new `uw_reg_addrs` (uw: unwrapped) to be
  more explicit about wrapped addr types thoughout.
- instead ensure `registry_addrs` are the wrapped types and pass down
  into the root `Actor` singleton-instance.
- factor the root-actor check + rt-vars update (updating the `'_root_addrs'`)
  out of `._runtime.async_main()` into this fn.
- as previous, set `trans_bind_addrs = uw_reg_addrs` in unwrapped form since it will
  be passed down both through rt-vars as `'_root_addrs'` and to
  `._runtim.async_main()` as `accept_addrs` (which is then passed to the
  IPC server).
- adjust/simplify much logging.
- shield the `await actor.cancel(None)  # self cancel` to avoid any
  finally-footguns.
- as mentioned convert the

For `_runtime.async_main()` tweaks,
- expect `registry_addrs: list[Address]|None = None` with appropriate
  unwrapping prior to setting both `.reg_addrs` and the equiv rt-var.
- add a new `.registry_addrs` prop for the wrapped form.
- convert a final loose-eg for the `service_nursery` to use
  `collapse_eg()`.
- simplify teardown report logging.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 81b11fd665 Add #TODO for `._context` to use `.msg.Aid` 2025-07-18 00:36:29 -04:00
Tyler Goodlet aa2b1fbf8b Add todo for py3.13+ `.shared_memory`'s new `track=False` support.. finally they added it XD 2025-07-18 00:36:29 -04:00
Tyler Goodlet 82c12253e5 Even more `.ipc.*` repr refinements
Mostly adjusting indentation, noise level, and clarity via `.pformat()`
tweaks more general use of `.devx.pformat.nest_from_op()`.

Specific impl deats,
- use `pformat.ppfmt()/`nest_from_op()` more seriously throughout
  `._server`.
- add a `._server.Endpoint.pformat()`.
- add `._server.Server.len_peers()` and `.repr_state()`.
- polish `Server.pformat()`.
- drop some redundant `log.runtime()`s from `._serve_ipc_eps()` instead
  leaving-them-only/putting-them in the caller pub meth.
- `._tcp.start_listener()` log the bound addr, not the input (which may
  be the 0-port.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 7f451409ec More `.ipc.Channel`-repr related tweaks
- only generate a repr in `.from_addr()` when log level is >= 'runtime'.
 |_ add a todo about supporting this optimization more generally on our
   adapter.
- fix `Channel.pformat()` to show unknown peer field line fmt correctly.
- add a `Channel.maddr: str` which just delegates directly to the
  `._transport` like other pass-thru property fields.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 9be6f6d3e9 Mk `Aid` hashable, use pretty-`.__repr__()`
Hash on the `.uuid: str` and delegate verbatim to
`msg.pretty_struct.Struct`'s equiv method.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 9d2c7ae3cf .log: expose `at_least_level()` as `StackLevelAdapter` meth 2025-07-18 00:36:29 -04:00
Tyler Goodlet a81a1be40c Drop `actor_info: str` from `._entry` logs 2025-07-18 00:36:29 -04:00
Tyler Goodlet c85575e6ce Try `nest_from_op()` in some `._rpc` spots
To start trying out,
- using in the `Start`-msg handler-block to repr the msg coming
  *from* a `repr(Channel)` using '<=)` sclang op.
- for a completed RPC task in `_invoke_non_context()`.
- for the msg loop task's termination report.
2025-07-18 00:36:29 -04:00
Tyler Goodlet aa98cbd848 Hide more `Channel._transport` privates for repr
Such as the `MsgTransport.stream` and `.drain` attrs since they're
rarely that important at the chan level. Also start adopting
a `.<attr>=` style for actual attrs of the type versus a `<name>:
` style for meta-field info lines.
2025-07-18 00:36:29 -04:00
Tyler Goodlet a890e9aa83 Refine `Actor` status iface, use `Aid` throughout
To simplify `.pformat()` output when the new `privates: bool` is unset
(the default) this adds new public attrs to wrap an actor's
cancellation status as well as provide a `.repr_state: str` (similar to
our equiv on `Context`). Rework `.pformat()` to render a much simplified
repr using all these new refinements.

Further, port the `.cancel()` method to use `.msg.types.Aid` for all
internal `requesting_uid` refs (now renamed with `_aid`) and in all
called downstream methods.

New cancel-state iface deats,
- rename `._cancel_called_by_remote` -> `._cancel_called_by` and expect
  it to be set as an `Aid`.
- add `.cancel_complete: bool` which flags whether `.cancel()` ran to
  completion.
- add `.cancel_called: bool` which just wraps `._cancel_called` (and
  which likely will just be dropped since we already have
  `._cancel_called_by`).
- add `.cancel_caller: Aid|None` which wraps `._cancel_called_by`.

In terms of using `Aid` in cancel methods,
- rename vars with `_aid` suffix in `.cancel()` (and wherever else).
- change `.cancel_rpc_tasks()` input param to `req_aid: msgtypes.Aid`.
- do the same for `._cancel_task()` and (for now until we adjust its
  internals as well) use the `Aid.uid` remap property when assigning
  `Context._canceller`.
- adjust all log msg refs to match obvi.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 1592f7e6be Add flag to toggle private vars in `Channel.pformat()`
Call it `privates: bool` and only show certain internal instance vars
when set in the `repr()` output.
2025-07-18 00:36:29 -04:00
Tyler Goodlet 1c9293e69d Extend `.msg.types.Aid` method interface
Providing the legacy `.uid -> tuple` style id (since still used for the
`Actor._contexts` table) and a `repr-one-line` method `.reprol() -> str`
for rendering a compact unique actor ID summary (useful in
logging/.pformat()s at the least).
2025-07-18 00:36:29 -04:00
Tyler Goodlet ec13c1b31d Enforce named-args only to `.open_nursery()` 2025-07-18 00:36:29 -04:00
Tyler Goodlet 7ce366097d Facepalm, fix `raise from` in `collapse_eg()`
I dunno what exactly I was thinking but we definitely don't want to
**ever** raise from the original exc-group, instead always raise from
any original `.__cause__` to be consistent with the embedded src-error's
context.

Also, adjust `maybe_collapse_eg()` to return `False` in the non-single
`.exceptions` case, again don't know what I was trying to do but this
simplifies caller logic and the prior return-semantic had no real
value..

This fixes some final usage in the runtime (namely top level nursery
usage in `._root`/`._runtime`) which was previously causing test suite
failures prior to this fix.
2025-07-18 00:36:03 -04:00
Tyler Goodlet 6cedda008a Just import `._runtime` ns in `._root`; be a bit more explicit 2025-07-18 00:36:02 -04:00
Tyler Goodlet 207175d78e Use collapse in `._root.open_root_actor()` too
Seems to add one more cancellation suite failure as well as now cause
the discovery test to error instead of fail?
2025-07-18 00:36:02 -04:00
Tyler Goodlet 57b5e51099 Use collapser around root tn in `.async_main()`
Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
2025-07-18 00:36:02 -04:00
Tyler Goodlet b72c8dce9b Drop msging-err patt from `subactor_breakpoint` ex
Since the `bdb` module was added to the namespace lookup set in
`._exceptions.get_err_type()` we can now relay a RAE-boxed
`bdb.BdbQuit`.
2025-07-18 00:36:02 -04:00
Tyler Goodlet bfa4d71009 Switch to strict-eg nurseries almost everywhere
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
  task.
- the daemon and "run-in-actor" layered tn pair allocated in
  `._supervise._open_and_supervise_one_cancels_all_nursery()`.

The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
2025-07-18 00:36:02 -04:00
Tyler Goodlet 434e22680e Use collapser in rent side of `Context` 2025-07-18 00:36:02 -04:00
Tyler Goodlet 636c19866c Add some tooling params to `collapse_eg()` 2025-07-18 00:35:45 -04:00
38 changed files with 386 additions and 1577 deletions

View File

@ -16,7 +16,6 @@ from tractor import (
ContextCancelled, ContextCancelled,
MsgStream, MsgStream,
_testing, _testing,
trionics,
) )
import trio import trio
import pytest import pytest
@ -63,8 +62,9 @@ async def recv_and_spawn_net_killers(
await ctx.started() await ctx.started()
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
) as tn,
): ):
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')

View File

@ -1,35 +0,0 @@
import trio
import tractor
async def main():
async with tractor.open_root_actor(
debug_mode=True,
loglevel='cancel',
) as _root:
# manually trigger self-cancellation and wait
# for it to fully trigger.
_root.cancel_soon()
await _root._cancel_complete.wait()
print('root cancelled')
# now ensure we can still use the REPL
try:
await tractor.pause()
except trio.Cancelled as _taskc:
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
# NOTE^^ above logic but inside `open_root_actor()` and
# passed to the `shield=` expression is effectively what
# we're testing here!
await tractor.pause(shield=root_cs.cancel_called)
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
# crash-handler block this should never be interacted,
# instead `trio.Cancelled` would be bubbled up: the original
# BUG.
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -23,8 +23,9 @@ async def main():
modules=[__name__] modules=[__name__]
) as portal_map, ) as portal_map,
tractor.trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
) as tn,
): ):
for (name, portal) in portal_map.items(): for (name, portal) in portal_map.items():

View File

@ -1,85 +0,0 @@
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
_lock: trio.Lock|None = None
@acm
async def acquire_singleton_lock(
) -> None:
global _lock
if _lock is None:
log.info('Allocating LOCK')
_lock = trio.Lock()
log.info('TRYING TO LOCK ACQUIRE')
async with _lock:
log.info('ACQUIRED')
yield _lock
log.info('RELEASED')
async def hold_lock_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
acquire_singleton_lock() as lock,
):
task_status.started(lock)
await trio.sleep_forever()
async def main(
ignore_special_cases: bool,
loglevel: str = 'info',
debug_mode: bool = True,
):
async with (
trio.open_nursery() as tn,
# tractor.trionics.maybe_raise_from_masking_exc()
# ^^^ XXX NOTE, interestingly putting the unmasker
# here does not exhibit the same behaviour ??
):
if not ignore_special_cases:
from tractor.trionics import _taskc
_taskc._mask_cases.clear()
_lock = await tn.start(
hold_lock_forever,
)
with trio.move_on_after(0.2):
await tn.start(
hold_lock_forever,
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'ignore_special_cases: {case!r}\n'
)
trio.run(partial(
main,
ignore_special_cases=case,
loglevel='info',
))

View File

@ -1,195 +0,0 @@
from contextlib import (
contextmanager as cm,
# TODO, any diff in async case(s)??
# asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
@cm
def teardown_on_exc(
raise_from_handler: bool = False,
):
'''
You could also have a teardown handler which catches any exc and
does some required teardown. In this case the problem is
compounded UNLESS you ensure the handler's scope is OUTSIDE the
`ux.aclose()`.. that is in the caller's enclosing scope.
'''
try:
yield
except BaseException as _berr:
berr = _berr
log.exception(
f'Handling termination teardown in child due to,\n'
f'{berr!r}\n'
)
if raise_from_handler:
# XXX teardown ops XXX
# on termination these steps say need to be run to
# ensure wider system consistency (like the state of
# remote connections/services).
#
# HOWEVER, any bug in this teardown code is also
# masked by the `tx.aclose()`!
# this is also true if `_tn.cancel_scope` is
# `.cancel_called` by the parent in a graceful
# request case..
# simulate a bug in teardown handler.
raise RuntimeError(
'woopsie teardown bug!'
)
raise # no teardown bug.
async def finite_stream_to_rent(
tx: trio.abc.SendChannel,
child_errors_mid_stream: bool,
raise_unmasked: bool,
task_status: trio.TaskStatus[
trio.CancelScope,
] = trio.TASK_STATUS_IGNORED,
):
async with (
# XXX without this unmasker the mid-streaming RTE is never
# reported since it is masked by the `tx.aclose()`
# call which in turn raises `Cancelled`!
#
# NOTE, this is WITHOUT doing any exception handling
# inside the child task!
#
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
tractor.trionics.maybe_raise_from_masking_exc(
raise_unmasked=raise_unmasked,
),
tx as tx, # .aclose() is the guilty masker chkpt!
# XXX, this ONLY matters in the
# `child_errors_mid_stream=False` case oddly!?
# THAT IS, if no tn is opened in that case then the
# test will not fail; it raises the RTE correctly?
#
# -> so it seems this new scope somehow affects the form of
# eventual in the parent EG?
tractor.trionics.maybe_open_nursery(
nursery=(
None
if not child_errors_mid_stream
else True
),
) as _tn,
):
# pass our scope back to parent for supervision\
# control.
cs: trio.CancelScope|None = (
None
if _tn is True
else _tn.cancel_scope
)
task_status.started(cs)
with teardown_on_exc(
raise_from_handler=not child_errors_mid_stream,
):
for i in range(100):
log.debug(
f'Child tx {i!r}\n'
)
if (
child_errors_mid_stream
and
i == 66
):
# oh wait but WOOPS there's a bug
# in that teardown code!?
raise RuntimeError(
'woopsie, a mid-streaming bug!?'
)
await tx.send(i)
async def main(
# TODO! toggle this for the 2 cases!
# 1. child errors mid-stream while parent is also requesting
# (graceful) cancel of that child streamer.
#
# 2. child contains a teardown handler which contains a
# bug and raises.
#
child_errors_mid_stream: bool,
raise_unmasked: bool = False,
loglevel: str = 'info',
):
tractor.log.get_console_log(level=loglevel)
# the `.aclose()` being checkpoints on these
# is the source of the problem..
tx, rx = trio.open_memory_channel(1)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
rx as rx,
):
_child_cs = await tn.start(
partial(
finite_stream_to_rent,
child_errors_mid_stream=child_errors_mid_stream,
raise_unmasked=raise_unmasked,
tx=tx,
)
)
async for msg in rx:
log.debug(
f'Rent rx {msg!r}\n'
)
# simulate some external cancellation
# request **JUST BEFORE** the child errors.
if msg == 65:
log.cancel(
f'Cancelling parent on,\n'
f'msg={msg}\n'
f'\n'
f'Simulates OOB cancel request!\n'
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'child_errors_midstream: {case!r}\n'
)
try:
trio.run(partial(
main,
child_errors_mid_stream=case,
# raise_unmasked=True,
loglevel='info',
))
except Exception as _exc:
exc = _exc
log.exception(
'Should have raised an RTE or Cancelled?\n'
)
breakpoint()

View File

@ -1,8 +1,8 @@
""" """
That "native" debug mode better work! That "native" debug mode better work!
All these tests can be understood (somewhat) by running the All these tests can be understood (somewhat) by running the equivalent
equivalent `examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
TODO: TODO:
- none of these tests have been run successfully on windows yet but - none of these tests have been run successfully on windows yet but
@ -925,7 +925,6 @@ def test_post_mortem_api(
"<Task 'name_error'", "<Task 'name_error'",
"NameError", "NameError",
"('child'", "('child'",
'getattr(doggypants)', # exc-LoC
] ]
) )
if ctlc: if ctlc:
@ -942,8 +941,8 @@ def test_post_mortem_api(
"<Task '__main__.main'", "<Task '__main__.main'",
"('root'", "('root'",
"NameError", "NameError",
"tractor.post_mortem()",
"src_uid=('child'", "src_uid=('child'",
"tractor.post_mortem()", # in `main()`-LoC
] ]
) )
if ctlc: if ctlc:
@ -961,10 +960,6 @@ def test_post_mortem_api(
"('root'", "('root'",
"NameError", "NameError",
"src_uid=('child'", "src_uid=('child'",
# raising line in `main()` but from crash-handling
# in `tractor.open_nursery()`.
'async with p.open_context(name_error) as (ctx, first):',
] ]
) )
if ctlc: if ctlc:
@ -1156,54 +1151,6 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
) )
def test_crash_handling_within_cancelled_root_actor(
spawn: PexpectSpawner,
):
'''
Ensure that when only a root-actor is started via `open_root_actor()`
we can crash-handle in debug-mode despite self-cancellation.
More-or-less ensures we conditionally shield the pause in
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
call.
'''
child = spawn('root_self_cancelled_w_error')
child.expect(PROMPT)
assert_before(
child,
[
"Actor.cancel_soon()` was called!",
"root cancelled",
_pause_msg,
"('root'", # actor name
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"AssertionError",
"assert 0",
]
)
child.sendline('c')
child.expect(EOF)
assert_before(
child,
[
"AssertionError",
"assert 0",
]
)
# TODO: better error for "non-ideal" usage from the root actor. # TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests # -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead # using `await tractor.pause()` instead since it's less overhead

View File

@ -18,9 +18,8 @@ from tractor import (
@pytest.fixture @pytest.fixture
def bindspace_dir_str() -> str: def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir() bs_dir_str: str = '/run/user/1000/doggy'
bs_dir: Path = rt_dir / 'doggy' bs_dir = Path(bs_dir_str)
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir() assert not bs_dir.is_dir()
yield bs_dir_str yield bs_dir_str

View File

@ -313,8 +313,9 @@ async def inf_streamer(
# `trio.EndOfChannel` doesn't propagate directly to the above # `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead # .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle? # of gracefully absorbing as normal.. so how to handle?
tractor.trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
) as tn,
): ):
async def close_stream_on_sentinel(): async def close_stream_on_sentinel():
async for msg in stream: async for msg in stream:

View File

@ -236,10 +236,7 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method): async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds # stream for at most 1 seconds
with ( with trio.move_on_after(1) as cancel_scope:
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'donny', 'donny',
@ -287,32 +284,20 @@ async def test_cancel_infinite_streamer(start_method):
], ],
) )
@tractor_test @tractor_test
async def test_some_cancels_all( async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
num_actors_and_errs: tuple, """Verify a subset of failed subactors causes all others in
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio. the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment. This is the first and only supervisory strategy at the moment.
"""
''' num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
# spawn the same number of deamon actors which should be cancelled # spawn the same number of deamon actors which should be cancelled
dactor_portals = [] dactor_portals = []
for i in range(num_actors): for i in range(num_actors):
dactor_portals.append(await an.start_actor( dactor_portals.append(await n.start_actor(
f'deamon_{i}', f'deamon_{i}',
enable_modules=[__name__], enable_modules=[__name__],
)) ))
@ -322,7 +307,7 @@ async def test_some_cancels_all(
for i in range(num_actors): for i in range(num_actors):
# start actor(s) that will fail immediately # start actor(s) that will fail immediately
riactor_portals.append( riactor_portals.append(
await an.run_in_actor( await n.run_in_actor(
func, func,
name=f'actor_{i}', name=f'actor_{i}',
**kwargs **kwargs
@ -352,8 +337,7 @@ async def test_some_cancels_all(
# should error here with a ``RemoteActorError`` or ``MultiError`` # should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as _err: except first_err as err:
err = _err
if isinstance(err, BaseExceptionGroup): if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
@ -364,8 +348,8 @@ async def test_some_cancels_all(
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type assert err.boxed_type == err_type
assert an.cancelled is True assert n.cancelled is True
assert not an._children assert not n._children
else: else:
pytest.fail("Should have gotten a remote assertion error?") pytest.fail("Should have gotten a remote assertion error?")
@ -535,15 +519,10 @@ def test_cancel_via_SIGINT_other_task(
async def main(): async def main():
# should never timeout since SIGINT should cancel the current program # should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with ( async with trio.open_nursery(
# XXX ?TODO? why no work!?
# tractor.trionics.collapse_eg(),
trio.open_nursery(
strict_exception_groups=False, strict_exception_groups=False,
) as tn, ) as n:
): await n.start(spawn_and_sleep_forever)
await tn.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend: if 'mp' in spawn_backend:
time.sleep(0.1) time.sleep(0.1)
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)
@ -554,123 +533,38 @@ def test_cancel_via_SIGINT_other_task(
async def spin_for(period=3): async def spin_for(period=3):
"Sync sleep." "Sync sleep."
print(f'sync sleeping in sub-sub for {period}\n')
time.sleep(period) time.sleep(period)
async def spawn_sub_with_sync_blocking_task(): async def spawn():
async with tractor.open_nursery() as an: async with tractor.open_nursery() as tn:
print('starting sync blocking subactor..\n') await tn.run_in_actor(
await an.run_in_actor(
spin_for, spin_for,
name='sleeper', name='sleeper',
) )
print('exiting first subactor layer..\n')
@pytest.mark.parametrize(
'man_cancel_outer',
[
False, # passes if delay != 2
# always causes an unexpected eg-w-embedded-assert-err?
pytest.param(True,
marks=pytest.mark.xfail(
reason=(
'always causes an unexpected eg-w-embedded-assert-err?'
)
),
),
],
)
@no_windows @no_windows
def test_cancel_while_childs_child_in_sync_sleep( def test_cancel_while_childs_child_in_sync_sleep(
loglevel: str, loglevel,
start_method: str, start_method,
spawn_backend: str, spawn_backend,
debug_mode: bool,
reg_addr: tuple,
man_cancel_outer: bool,
): ):
''' """Verify that a child cancelled while executing sync code is torn
Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent down even when that cancellation is triggered by the parent
2 nurseries "up". 2 nurseries "up".
"""
Though the grandchild should stay blocking its actor runtime, its
parent should issue a "zombie reaper" to hard kill it after
sufficient timeout.
'''
if start_method == 'forkserver': if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...") pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main(): async def main():
# with trio.fail_after(2):
# XXX BIG TODO NOTE XXX async with tractor.open_nursery() as tn:
# await tn.run_in_actor(
# it seems there's a strange race that can happen spawn,
# where where the fail-after will trigger outer scope name='spawn',
# .cancel() which then causes the inner scope to raise,
#
# BaseExceptionGroup('Exceptions from Trio nursery', [
# BaseExceptionGroup('Exceptions from Trio nursery',
# [
# Cancelled(),
# Cancelled(),
# ]
# ),
# AssertionError('assert 0')
# ])
#
# WHY THIS DOESN'T MAKE SENSE:
# ---------------------------
# - it should raise too-slow-error when too slow..
# * verified that using simple-cs and manually cancelling
# you get same outcome -> indicates that the fail-after
# can have its TooSlowError overriden!
# |_ to check this it's easy, simplly decrease the timeout
# as per the var below.
#
# - when using the manual simple-cs the outcome is different
# DESPITE the `assert 0` which means regardless of the
# inner scope effectively failing in the same way, the
# bubbling up **is NOT the same**.
#
# delays trigger diff outcomes..
# ---------------------------
# as seen by uncommenting various lines below there is from
# my POV an unexpected outcome due to the delay=2 case.
#
# delay = 1 # no AssertionError in eg, TooSlowError raised.
# delay = 2 # is AssertionError in eg AND no TooSlowError !?
delay = 4 # is AssertionError in eg AND no _cs cancellation.
with trio.fail_after(delay) as _cs:
# with trio.CancelScope() as cs:
# ^XXX^ can be used instead to see same outcome.
async with (
# tractor.trionics.collapse_eg(), # doesn't help
tractor.open_nursery(
hide_tb=False,
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
):
await an.run_in_actor(
spawn_sub_with_sync_blocking_task,
name='sync_blocking_sub',
) )
await trio.sleep(1) await trio.sleep(1)
if man_cancel_outer:
print('Cancelling manually in root')
_cs.cancel()
# trigger exc-srced taskc down
# the actor tree.
print('RAISING IN ROOT')
assert 0 assert 0
with pytest.raises(AssertionError): with pytest.raises(AssertionError):

View File

@ -117,10 +117,9 @@ async def open_actor_local_nursery(
ctx: tractor.Context, ctx: tractor.Context,
): ):
global _nursery global _nursery
async with ( async with trio.open_nursery(
tractor.trionics.collapse_eg(), strict_exception_groups=False,
trio.open_nursery() as tn ) as tn:
):
_nursery = tn _nursery = tn
await ctx.started() await ctx.started()
await trio.sleep(10) await trio.sleep(10)

View File

@ -13,24 +13,26 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None: def test_empty_mngrs_input_raises() -> None:
async def main(): async def main():
with trio.fail_after(3): with trio.fail_after(1):
async with ( async with (
open_actor_cluster( open_actor_cluster(
modules=[__name__], modules=[__name__],
# NOTE: ensure we can passthrough runtime opts # NOTE: ensure we can passthrough runtime opts
loglevel='cancel', loglevel='info',
debug_mode=False, # debug_mode=True,
) as portals, ) as portals,
gather_contexts(mngrs=()), gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
): ):
# should fail before this? assert 0
assert portals
# test should fail if we mk it here!
assert 0, 'Should have raised val-err !?'
with pytest.raises(ValueError): with pytest.raises(ValueError):
trio.run(main) trio.run(main)

View File

@ -11,7 +11,6 @@ import psutil
import pytest import pytest
import subprocess import subprocess
import tractor import tractor
from tractor.trionics import collapse_eg
from tractor._testing import tractor_test from tractor._testing import tractor_test
import trio import trio
@ -194,10 +193,10 @@ async def spawn_and_check_registry(
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
async with ( async with trio.open_nursery(
collapse_eg(), strict_exception_groups=False,
trio.open_nursery() as trion, ) as trion:
):
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
@ -339,12 +338,11 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2:
async with ( async with trio.open_nursery(
collapse_eg(), strict_exception_groups=False,
trio.open_nursery() as tn, ) as n:
): n.start_soon(streamer, agen1)
tn.start_soon(streamer, agen1) n.start_soon(cancel, use_signal, .5)
tn.start_soon(cancel, use_signal, .5)
try: try:
await streamer(agen2) await streamer(agen2)
finally: finally:

View File

@ -95,7 +95,6 @@ def run_example_in_subproc(
and 'integration' not in p[0] and 'integration' not in p[0]
and 'advanced_faults' not in p[0] and 'advanced_faults' not in p[0]
and 'multihost' not in p[0] and 'multihost' not in p[0]
and 'trio' not in p[0]
) )
], ],
ids=lambda t: t[1], ids=lambda t: t[1],

View File

@ -234,8 +234,10 @@ async def trio_ctx(
with trio.fail_after(1 + delay): with trio.fail_after(1 + delay):
try: try:
async with ( async with (
tractor.trionics.collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, # TODO, for new `trio` / py3.13
# strict_exception_groups=False,
) as tn,
tractor.to_asyncio.open_channel_from( tractor.to_asyncio.open_channel_from(
sleep_and_err, sleep_and_err,
) as (first, chan), ) as (first, chan),

View File

@ -24,10 +24,14 @@ from tractor._testing import (
) )
# XXX TODO cases: # XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised # - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently? # `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()` # => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case? # already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via # - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor(). # Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's # => all other connected peers should get that cancel requesting peer's
@ -40,6 +44,16 @@ from tractor._testing import (
# that also spawned a remote task task in that same peer-parent. # that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context @tractor.context
async def open_stream_then_sleep_forever( async def open_stream_then_sleep_forever(
ctx: Context, ctx: Context,
@ -792,7 +806,7 @@ async def basic_echo_server(
ctx: Context, ctx: Context,
peer_name: str = 'wittle_bruv', peer_name: str = 'wittle_bruv',
err_after_imsg: int|None = None, err_after: int|None = None,
) -> None: ) -> None:
''' '''
@ -821,9 +835,8 @@ async def basic_echo_server(
await ipc.send(resp) await ipc.send(resp)
if ( if (
err_after_imsg err_after
and and i > err_after
i > err_after_imsg
): ):
raise RuntimeError( raise RuntimeError(
f'Simulated error in `{peer_name}`' f'Simulated error in `{peer_name}`'
@ -965,8 +978,7 @@ async def tell_little_bro(
actor_name: str, actor_name: str,
caller: str = '', caller: str = '',
err_after: float|None = None, err_after: int|None = None,
rng_seed: int = 50,
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
async with ( async with (
@ -977,18 +989,14 @@ async def tell_little_bro(
basic_echo_server, basic_echo_server,
# XXX proxy any delayed err condition # XXX proxy any delayed err condition
err_after_imsg=( err_after=err_after,
err_after * rng_seed
if err_after is not None
else None
),
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
uid: tuple = actor.uid uid: tuple = actor.uid
for i in range(rng_seed): for i in range(100):
msg: tuple = ( msg: tuple = (
uid, uid,
i, i,
@ -1013,13 +1021,13 @@ async def tell_little_bro(
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'raise_sub_spawn_error_after', 'raise_sub_spawn_error_after',
[None, 0.5], [None, 50],
) )
def test_peer_spawns_and_cancels_service_subactor( def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool, debug_mode: bool,
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None, raise_sub_spawn_error_after: int|None,
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -1033,7 +1041,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate! # and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro' peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError): def check_inner_rte(rae: RemoteActorError):
''' '''
Validate the little_bro's relayed inception! Validate the little_bro's relayed inception!
@ -1127,7 +1134,8 @@ def test_peer_spawns_and_cancels_service_subactor(
) )
try: try:
res = await client_ctx.wait_for_result(hide_tb=False) res = await client_ctx.result(hide_tb=False)
# in remote (relayed inception) error # in remote (relayed inception) error
# case, we should error on the line above! # case, we should error on the line above!
if raise_sub_spawn_error_after: if raise_sub_spawn_error_after:
@ -1138,23 +1146,6 @@ def test_peer_spawns_and_cancels_service_subactor(
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked assert client_ctx.cancel_acked
assert res.canceller == root.uid assert res.canceller == root.uid
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# XXX, only for tracing
# except BaseException as _berr:
# berr = _berr
# await tractor.pause(shield=True)
# raise berr
except RemoteActorError as rae: except RemoteActorError as rae:
_err = rae _err = rae
@ -1183,8 +1174,19 @@ def test_peer_spawns_and_cancels_service_subactor(
raise raise
# await tractor.pause() # await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# await tractor.pause()
# await server.cancel_actor() # await server.cancel_actor()
except RemoteActorError as rae: except RemoteActorError as rae:
@ -1197,7 +1199,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# since we called `.cancel_actor()`, `.cancel_ack` # since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not # will not be set on the ctx bc `ctx.cancel()` was not
# called directly for this confext. # called directly fot this confext.
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
_ctxc = ctxc _ctxc = ctxc
print( print(
@ -1237,19 +1239,12 @@ def test_peer_spawns_and_cancels_service_subactor(
# assert spawn_ctx.cancelled_caught # assert spawn_ctx.cancelled_caught
async def _main():
with trio.fail_after(
3 if not debug_mode
else 999
):
await main()
if raise_sub_spawn_error_after: if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(RemoteActorError) as excinfo:
trio.run(_main) trio.run(main)
rae: RemoteActorError = excinfo.value rae: RemoteActorError = excinfo.value
check_inner_rte(rae) check_inner_rte(rae)
else: else:
trio.run(_main) trio.run(main)

View File

@ -235,16 +235,10 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def time_quad_ex( def time_quad_ex(reg_addr, ci_env, spawn_backend):
reg_addr: tuple,
ci_env: bool,
spawn_backend: str,
):
if spawn_backend == 'mp': if spawn_backend == 'mp':
''' """no idea but the mp *nix runs are flaking out here often...
no idea but the mp *nix runs are flaking out here often... """
'''
pytest.skip("Test is too flaky on mp in CI") pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@ -255,24 +249,12 @@ def time_quad_ex(
return results, diff return results, diff
def test_a_quadruple_example( def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
time_quad_ex: tuple, """This also serves as a kind of "we'd like to be this fast test"."""
ci_env: bool,
spawn_backend: str,
):
'''
This also serves as a kind of "we'd like to be this fast test".
'''
results, diff = time_quad_ex results, diff = time_quad_ex
assert results assert results
this_fast = ( this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
6 if platform.system() in (
'Windows',
'Darwin',
)
else 3
)
assert diff < this_fast assert diff < this_fast

View File

@ -1,239 +0,0 @@
'''
Define the details of inter-actor "out-of-band" (OoB) cancel
semantics, that is how cancellation works when a cancel request comes
from the different concurrency (primitive's) "layer" then where the
eventual `trio.Task` actually raises a signal.
'''
from functools import partial
# from contextlib import asynccontextmanager as acm
# import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
ActorNursery,
Portal,
Context,
# ContextCancelled,
# RemoteActorError,
)
# from tractor._testing import (
# tractor_test,
# expect_ctxc,
# )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
#
# things to ensure!
# -[ ] the ctxc raised in a child should ideally show the tb of the
# underlying `Cancelled` checkpoint, i.e.
# `raise scope_error from ctxc`?
#
# -[ ] a self-cancelled context, if not allowed to block on
# `ctx.result()` at some point will hang since the `ctx._scope`
# is never `.cancel_called`; cases for this include,
# - an `open_ctx()` which never starteds before being OoB actor
# cancelled.
# |_ parent task will be blocked in `.open_context()` for the
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
# will never have been signalled..
# '''
# ...
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
# but with the `Lock.acquire()` from a `@context` to ensure the
# implicit ignore-case-non-unmasking.
#
# @tractor.context
# async def acquire_actor_global_lock(
# ctx: tractor.Context,
# ignore_special_cases: bool,
# ):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
# # block til cancelled
# await trio.sleep_forever()
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
# ignore_special_cases: bool,
do_started: bool,
):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
if do_started:
await ctx.started()
# block til cancelled
print('sleepin on child-side..')
await trio.sleep_forever()
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
)
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
debug_mode: bool,
loglevel: str,
cancel_ctx: bool,
):
'''
The most "basic" out-of-band-task self-cancellation case where
`Portal.open_context()` is entered in a bg task and the
parent-task (of the containing nursery) calls `Context.cancel()`
without the child knowing; the `Context._scope` should be
`.cancel_called` when the IPC ctx's child-side relays
a `ContextCancelled` with a `.canceller` set to the parent
actor('s task).
'''
async def main():
with trio.fail_after(
2 if not debug_mode else 999,
):
an: ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
loglevel='devx',
enable_stack_on_sig=True,
) as an,
trio.open_nursery() as tn,
):
ptl: Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
async def _open_ctx_async(
do_started: bool = True,
task_status=trio.TASK_STATUS_IGNORED,
):
# do we expect to never enter the
# `.open_context()` below.
if not do_started:
task_status.started()
async with ptl.open_context(
sleep_forever,
do_started=do_started,
) as (ctx, first):
task_status.started(ctx)
await trio.sleep_forever()
# XXX, this is the key OoB part!
#
# - start the `.open_context()` in a bg task which
# blocks inside the embedded scope-body,
#
# - when we call `Context.cancel()` it **is
# not** from the same task which eventually runs
# `.__aexit__()`,
#
# - since the bg "opener" task will be in
# a `trio.sleep_forever()`, it must be interrupted
# by the `ContextCancelled` delivered from the
# child-side; `Context._scope: CancelScope` MUST
# be `.cancel_called`!
#
print('ASYNC opening IPC context in subtask..')
maybe_ctx: Context|None = await tn.start(partial(
_open_ctx_async,
))
if (
maybe_ctx
and
cancel_ctx
):
print('cancelling first IPC ctx!')
await maybe_ctx.cancel()
# XXX, note that despite `maybe_context.cancel()`
# being called above, it's the parent (bg) task
# which was originally never interrupted in
# the `ctx._scope` body due to missing case logic in
# `ctx._maybe_cancel_and_set_remote_error()`.
#
# It didn't matter that the subactor process was
# already terminated and reaped, nothing was
# cancelling the ctx-parent task's scope!
#
print('cancelling subactor!')
await ptl.cancel_actor()
if maybe_ctx:
try:
await maybe_ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert not cancel_ctx
assert (
ctxc.canceller
==
tractor.current_actor().aid.uid
)
# don't re-raise since it'll trigger
# an EG from the above tn.
if cancel_ctx:
# graceful self-cancel
trio.run(main)
else:
# ctx parent task should see OoB ctxc due to
# `ptl.cancel_actor()`.
with pytest.raises(tractor.ContextCancelled) as excinfo:
trio.run(main)
assert 'root' in excinfo.value.canceller[0]
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
# debug_mode: bool,
# loglevel: str,
# ):
# '''
# Demos OoB cancellation from the perspective of a ctx opened with
# a child subactor where the parent cancels the child at the "actor
# layer" using `Portal.cancel_actor()` and thus the
# `ContextCancelled.canceller` received by the ctx's parent-side
# task will appear to be a "self cancellation" even though that
# specific task itself was not cancelled and thus
# `Context.cancel_called ==False`.
# '''
# TODO, do we have an existing implied ctx
# cancel test like this?
# with trio.move_on_after(0.5):# as cs:
# await _open_ctx_async(
# do_started=False,
# )
# in-line ctx scope should definitely raise
# a ctxc with `.canceller = 'root'`
# async with ptl.open_context(
# sleep_forever,
# do_started=True,
# ) as pair:

View File

@ -147,7 +147,8 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever() await trio.sleep_forever()
async def _trio_main(): async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999): # with trio.fail_after(2):
with trio.fail_after(999):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event() aio_ev = asyncio.Event()
@ -216,25 +217,32 @@ def test_trio_prestarted_task_bubbles(
): ):
aio_ev.set() aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side # ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first. # having (maybe) errored first.
if aio_err_trigger in ( if aio_err_trigger in (
'after_trio_task_starts', 'after_trio_task_starts',
'after_start_point', 'after_start_point',
): ):
patt: str = 'trio-side' assert len(errs := rest_eg.exceptions) == 1
expect_exc = TypeError typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should # when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side. # never see anythinb but the aio-side.
else: else:
patt: str = 'asyncio-side' assert len(rtes := rte_eg.exceptions) == 1
expect_exc = RuntimeError assert 'asyncio-side' in rtes[0].args[0]
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args

View File

@ -6,18 +6,10 @@ want to see changed.
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from types import ModuleType
from functools import partial
import pytest import pytest
from _pytest import pathlib
from tractor.trionics import collapse_eg
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from tractor._testing import (
examples_dir,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -72,8 +64,9 @@ def test_stashed_child_nursery(use_start_soon):
async def main(): async def main():
async with ( async with (
collapse_eg(), trio.open_nursery(
trio.open_nursery() as pn, strict_exception_groups=False,
) as pn,
): ):
cn = await pn.start(mk_child_nursery) cn = await pn.start(mk_child_nursery)
assert cn assert cn
@ -113,9 +106,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Demo how a masking `trio.Cancelled` could be handled by unmasking Demo how a masking `trio.Cancelled` could be handled by unmasking from the
from the `.__context__` field when a user (by accident) re-raises `.__context__` field when a user (by accident) re-raises from a `finally:`.
from a `finally:`.
''' '''
import tractor import tractor
@ -166,13 +158,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert len(assert_eg.exceptions) == 1 assert len(assert_eg.exceptions) == 1
def test_gatherctxs_with_memchan_breaks_multicancelled( def test_gatherctxs_with_memchan_breaks_multicancelled(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Demo how a using an `async with sndchan` inside Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
a `.trionics.gather_contexts()` task will break a strict-eg-tn's will break a strict-eg-tn's multi-cancelled absorption..
multi-cancelled absorption..
''' '''
from tractor import ( from tractor import (
@ -198,12 +190,15 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
f'Closed {task!r}\n' f'Closed {task!r}\n'
) )
async def main(): async def main():
async with ( async with (
# XXX should ensure ONLY the KBI # XXX should ensure ONLY the KBI
# is relayed upward # is relayed upward
collapse_eg(), trionics.collapse_eg(),
trio.open_nursery(), # as tn, trio.open_nursery(
# strict_exception_groups=False,
), # as tn,
trionics.gather_contexts([ trionics.gather_contexts([
open_memchan(), open_memchan(),
@ -218,85 +213,3 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run(main) trio.run(main)
@pytest.mark.parametrize(
'raise_unmasked', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/send_chan_aclose_masks.py"
)
),
]
)
@pytest.mark.parametrize(
'child_errors_mid_stream',
[True, False],
)
def test_unmask_aclose_as_checkpoint_on_aexit(
raise_unmasked: bool,
child_errors_mid_stream: bool,
debug_mode: bool,
):
'''
Verify that our unmasker util works over the common case where
a mem-chan's `.aclose()` is included in an `@acm` stack
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
exception from user code resulting in a silent failure which
appears like graceful cancellation.
This test suite is mostly implemented as an example script so it
could more easily be shared with `trio`-core peeps as `tractor`-less
minimum reproducing example.
'''
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'send_chan_aclose_masks_beg.py',
root=examples_dir(),
consider_namespace_packages=False,
)
with pytest.raises(RuntimeError):
trio.run(partial(
mod.main,
raise_unmasked=raise_unmasked,
child_errors_mid_stream=child_errors_mid_stream,
))
@pytest.mark.parametrize(
'ignore_special_cases', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/lockacquire_not_umasked.py"
)
),
]
)
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
ignore_special_cases: bool,
loglevel: str,
debug_mode: bool,
):
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'lockacquire_not_unmasked.py',
root=examples_dir(),
consider_namespace_packages=False,
)
async def _main():
with trio.fail_after(2):
await mod.main(
ignore_special_cases=ignore_special_cases,
loglevel=loglevel,
debug_mode=debug_mode,
)
trio.run(_main)

View File

@ -55,17 +55,10 @@ async def open_actor_cluster(
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with ( async with tractor.open_nursery(
# tractor.trionics.collapse_eg(),
tractor.open_nursery(
**runtime_kwargs, **runtime_kwargs,
) as an ) as an:
): async with trio.open_nursery() as n:
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc()
):
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
async def _start(name: str) -> None: async def _start(name: str) -> None:
@ -76,8 +69,9 @@ async def open_actor_cluster(
) )
for name in names: for name in names:
tn.start_soon(_start, name) n.start_soon(_start, name)
assert len(portals) == count assert len(portals) == count
yield portals yield portals
await an.cancel(hard_kill=hard_kill) await an.cancel(hard_kill=hard_kill)

View File

@ -442,25 +442,25 @@ class Context:
''' '''
Records whether cancellation has been requested for this context Records whether cancellation has been requested for this context
by a call to `.cancel()` either due to, by a call to `.cancel()` either due to,
- an explicit call by some local task, - either an explicit call by some local task,
- or an implicit call due to an error caught inside - or an implicit call due to an error caught inside
the `Portal.open_context()` block. the ``Portal.open_context()`` block.
''' '''
return self._cancel_called return self._cancel_called
# XXX, to debug who frickin sets it.. @cancel_called.setter
# @cancel_called.setter def cancel_called(self, val: bool) -> None:
# def cancel_called(self, val: bool) -> None: '''
# ''' Set the self-cancelled request `bool` value.
# Set the self-cancelled request `bool` value.
# ''' '''
# to debug who frickin sets it..
# if val: # if val:
# from .devx import pause_from_sync # from .devx import pause_from_sync
# pause_from_sync() # pause_from_sync()
# self._cancel_called = val self._cancel_called = val
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
@ -635,71 +635,6 @@ class Context:
''' '''
await self.chan.send(Stop(cid=self.cid)) await self.chan.send(Stop(cid=self.cid))
@property
def parent_task(self) -> trio.Task:
'''
This IPC context's "owning task" which is a `trio.Task`
on one of the "sides" of the IPC.
Note that the "parent_" prefix here refers to the local
`trio` task tree using the same interface as
`trio.Nursery.parent_task` whereas for IPC contexts,
a different cross-actor task hierarchy exists:
- a "parent"-side which originally entered
`Portal.open_context()`,
- the "child"-side which was spawned and scheduled to invoke
a function decorated with `@tractor.context`.
This task is thus a handle to mem-domain-distinct/per-process
`Nursery.parent_task` depending on in which of the above
"sides" this context exists.
'''
return self._task
def _is_blocked_on_rx_chan(self) -> bool:
'''
Predicate to indicate whether the owner `._task: trio.Task` is
currently blocked (by `.receive()`-ing) on its underlying RPC
feeder `._rx_chan`.
This knowledge is highly useful when handling so called
"out-of-band" (OoB) cancellation conditions where a peer
actor's task transmitted some remote error/cancel-msg and we
must know whether to signal-via-cancel currently executing
"user-code" (user defined code embedded in `ctx._scope`) or
simply to forward the IPC-msg-as-error **without calling**
`._scope.cancel()`.
In the latter case it is presumed that if the owner task is
blocking for the next IPC msg, it will eventually receive,
process and raise the equivalent local error **without**
requiring `._scope.cancel()` to be explicitly called by the
*delivering OoB RPC-task* (via `_deliver_msg()`).
'''
# NOTE, see the mem-chan meth-impls for *why* this
# logic works,
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
#
# XXX realize that this is NOT an
# official/will-be-loudly-deprecated API:
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
#
# orig repo intro in the mem-chan change over patch:
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
# |_https://github.com/python-trio/trio/pull/616
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
#
return (
self._task.custom_sleep_data
is
self._rx_chan
)
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
error: BaseException, error: BaseException,
@ -852,27 +787,13 @@ class Context:
if self._canceller is None: if self._canceller is None:
log.error('Ctx has no canceller set!?') log.error('Ctx has no canceller set!?')
cs: trio.CancelScope = self._scope
# ?TODO? see comment @ .start_remote_task()`
#
# if not cs:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise RuntimeError(
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
# f'{self}\n'
# f'\n'
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
# )
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the # once exiting (or manually calling `.wait_for_result()`) the
# `.open_context()` block. # `.open_context()` block.
cs: trio.CancelScope = self._scope
if ( if (
cs cs
and not cs.cancel_called
# XXX this is an expected cancel request response # XXX this is an expected cancel request response
# message and we **don't need to raise it** in the # message and we **don't need to raise it** in the
@ -881,7 +802,8 @@ class Context:
# if `._cancel_called` then `.cancel_acked and .cancel_called` # if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set. # always should be set.
and not self._is_self_cancelled() and not self._is_self_cancelled()
# and not cs.cancelled_caught and not cs.cancel_called
and not cs.cancelled_caught
): ):
if ( if (
msgerr msgerr
@ -892,7 +814,7 @@ class Context:
not self._cancel_on_msgerr not self._cancel_on_msgerr
): ):
message: str = ( message: str = (
f'NOT Cancelling `Context._scope` since,\n' 'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n' f'AND we got a msg-type-error!\n'
f'{error}\n' f'{error}\n'
@ -902,43 +824,13 @@ class Context:
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n' message: str = 'Cancelling `Context._scope` !\n\n'
cs.cancel() # from .devx import pause_from_sync
# pause_from_sync()
# TODO, explicit condition for OoB (self-)cancellation? self._scope.cancel()
# - we called `Portal.cancel_actor()` from this actor else:
# and the peer ctx task delivered ctxc due to it. message: str = 'NOT cancelling `Context._scope` !\n\n'
# - currently `self._is_self_cancelled()` will be true
# since the ctxc.canceller check will match us even though it
# wasn't from this ctx specifically!
elif (
cs
and self._is_self_cancelled()
and not cs.cancel_called
):
message: str = (
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
'\n'
)
# from .devx import mk_pdb # from .devx import mk_pdb
# mk_pdb().set_trace() # mk_pdb().set_trace()
# TODO XXX, required to fix timeout failure in
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
#
# XXX NOTE XXX, this is SUPER SUBTLE!
# we only want to cancel our embedded `._scope`
# if the ctx's current/using task is NOT blocked
# on `._rx_chan.receive()` and on some other
# `trio`-checkpoint since in the former case
# any `._remote_error` will be relayed through
# the rx-chan and appropriately raised by the owning
# `._task` directly. IF the owner task is however
# blocking elsewhere we need to interrupt it **now**.
if not self._is_blocked_on_rx_chan():
cs.cancel()
else:
# rx_stats = self._rx_chan.statistics()
message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if ( if (
@ -962,7 +854,6 @@ class Context:
+ +
cs_fmt cs_fmt
) )
log.cancel( log.cancel(
message message
+ +
@ -1055,9 +946,8 @@ class Context:
''' '''
side: str = self.side side: str = self.side
self._cancel_called = True # XXX for debug via the `@.setter`
# ^ XXX for debug via the `@.setter` self.cancel_called = True
# self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx from {side!r}-side\n' f'Cancelling ctx from {side!r}-side\n'
@ -1121,6 +1011,7 @@ class Context:
else: else:
log.cancel( log.cancel(
f'Timed out on cancel request of remote task?\n' f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}' f'{reminfo}'
) )
@ -1601,12 +1492,6 @@ class Context:
): ):
status = 'peer-cancelled' status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition # (remote) error condition
case ( case (
Unresolved, Unresolved,
@ -2121,9 +2006,6 @@ async def open_context_from_portal(
f'|_{portal.actor}\n' f'|_{portal.actor}\n'
) )
# ?TODO? could we move this to inside the `tn` block?
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
# -> would allow a `if not ._scope: => raise RTE` ?
ctx: Context = await portal.actor.start_remote_task( ctx: Context = await portal.actor.start_remote_task(
portal.channel, portal.channel,
nsf=nsf, nsf=nsf,
@ -2150,7 +2032,6 @@ async def open_context_from_portal(
scope_err: BaseException|None = None scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None ctxc_from_child: ContextCancelled|None = None
try: try:
# from .devx import pause
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -2173,10 +2054,6 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: try:
log.runtime(
f'IPC ctx parent waiting on Started msg..\n'
f'ctx.cid: {ctx.cid!r}\n'
)
started_msg, first = await ctx._pld_rx.recv_msg( started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
@ -2185,16 +2062,16 @@ async def open_context_from_portal(
) )
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope ctx_cs: trio.CancelScope = ctx._scope
log.cancel(
f'IPC ctx was cancelled during "child" task sync due to\n\n'
f'.cid: {ctx.cid!r}\n'
f'.maybe_error: {ctx.maybe_error!r}\n'
)
# await pause(shield=True)
if not ctx_cs.cancel_called: if not ctx_cs.cancel_called:
raise raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually, # OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to # likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure # `._maybe_cancel_and_set_remote_error()` so ensure
@ -2390,16 +2267,13 @@ async def open_context_from_portal(
match scope_err: match scope_err:
case trio.Cancelled(): case trio.Cancelled():
logmeth = log.cancel logmeth = log.cancel
cause: str = 'cancelled'
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases
case _: case _:
cause: str = 'errored'
logmeth = log.exception logmeth = log.exception
logmeth( logmeth(
f'ctx {ctx.side!r}-side {cause!r} with,\n' f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
f'{ctx.repr_outcome()!r}\n'
) )
if debug_mode(): if debug_mode():
@ -2424,7 +2298,6 @@ async def open_context_from_portal(
# told us it's cancelled ;p # told us it's cancelled ;p
if ctxc_from_child is None: if ctxc_from_child is None:
try: try:
# await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
@ -2581,10 +2454,8 @@ async def open_context_from_portal(
log.cancel( log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n' f'Context cancelled by local {ctx.side!r}-side task\n'
f'c)>\n' f'c)>\n'
f' |_{ctx.parent_task}\n' f' |_{ctx._task}\n\n'
f' .cid={ctx.cid!r}\n' f'{repr(scope_err)}\n'
f'\n'
f'{scope_err!r}\n'
) )
# TODO: should we add a `._cancel_req_received` # TODO: should we add a `._cancel_req_received`

View File

@ -481,11 +481,10 @@ async def open_root_actor(
collapse_eg(), collapse_eg(),
trio.open_nursery() as root_tn, trio.open_nursery() as root_tn,
# ?TODO? finally-footgun below? # XXX, finally-footgun below?
# -> see note on why shielding. # -> see note on why shielding.
# maybe_raise_from_masking_exc(), # maybe_raise_from_masking_exc(),
): ):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery # `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process) # and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called # tree has terminated thereby conducting so called
@ -524,11 +523,6 @@ async def open_root_actor(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
debug_filter=debug_filter, debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
) )
if ( if (
@ -568,7 +562,6 @@ async def open_root_actor(
f'{op_nested_actor_repr}' f'{op_nested_actor_repr}'
) )
# XXX, THIS IS A *finally-footgun*! # XXX, THIS IS A *finally-footgun*!
# (also mentioned in with-block above)
# -> though already shields iternally it can # -> though already shields iternally it can
# taskc here and mask underlying errors raised in # taskc here and mask underlying errors raised in
# the try-block above? # the try-block above?

View File

@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
except ( except (
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis and debug_kbis
) )
) )
# TODO? better then `debug_filter` below?
# and
# not isinstance(err, TransportClosed)
): ):
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n' if _state.debug_mode():
f'|_{ctx}' log.exception(
f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
) )
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but # if we prolly should have entered the REPL but
@ -384,7 +403,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping. # RPC task bookeeping.
# since RPC tasks are scheduled inside a flat # since RPC tasks are scheduled inside a flat
# `Actor._service_tn`, we add "handles" to each such that # `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
@ -450,7 +469,7 @@ async def _invoke(
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
hide_tb: bool = True, hide_tb: bool = False,
return_msg_type: Return|CancelAck = Return, return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
@ -462,7 +481,7 @@ async def _invoke(
connected IPC channel. connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_tn: Nursery`. remotely invoked function, normally in `Actor._service_n: Nursery`.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -642,7 +661,7 @@ async def _invoke(
tn: Nursery tn: Nursery
rpc_ctx_cs: CancelScope rpc_ctx_cs: CancelScope
async with ( async with (
collapse_eg(hide_tb=False), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
@ -674,7 +693,20 @@ async def _invoke(
f'\n' f'\n'
f'{pretty_struct.pformat(return_msg)}\n' f'{pretty_struct.pformat(return_msg)}\n'
) )
try:
await chan.send(return_msg) await chan.send(return_msg)
except TransportClosed:
log.exception(
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
f'\n'
f'{chan}\n'
f'Channel already disconnected ??\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}'
)
# ?TODO? will this ever be true though?
if chan.connected():
raise
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -822,44 +854,24 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n' f'after having {ctx.repr_state!r}\n'
) )
if merr: if merr:
logmeth: Callable = log.error logmeth: Callable = log.error
if ( if isinstance(merr, ContextCancelled):
# ctxc: by `Context.cancel()` logmeth: Callable = log.runtime
isinstance(merr, ContextCancelled)
# out-of-layer cancellation, one of: if not isinstance(merr, RemoteActorError):
# - actorc: by `Portal.cancel_actor()` tb_str: str = ''.join(traceback.format_exception(merr))
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
)
else:
descr_str += (
f'{merr!r}\n'
)
else:
descr_str += (
f'\n' f'\n'
f'with final result {ctx.outcome!r}\n' f'scope_error:\n'
f'{scope_err!r}\n'
) )
else:
descr_str += f'\n{merr!r}\n'
else:
descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth( logmeth(
f'{message}\n' f'{message}\n'
@ -935,7 +947,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_tn: Nursery`. `trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting) Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`) request payloads (eg. `started`, `yield`, `return`, `error`)
@ -960,7 +972,7 @@ async def process_messages(
''' '''
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
assert actor._service_tn # runtime state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
@ -1171,7 +1183,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n' start_status += '->( scheduling new task..\n'
log.runtime(start_status) log.runtime(start_status)
try: try:
ctx: Context = await actor._service_tn.start( ctx: Context = await actor._service_n.start(
partial( partial(
_invoke, _invoke,
actor, actor,
@ -1311,7 +1323,7 @@ async def process_messages(
) as err: ) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn: Nursery = actor._service_tn sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel( log.cancel(
f'Service nursery cancelled before it handled {funcname}' f'Service nursery cancelled before it handled {funcname}'

View File

@ -35,15 +35,6 @@ for running all lower level spawning, supervision and msging layers:
SC-transitive RPC via scheduling of `trio` tasks. SC-transitive RPC via scheduling of `trio` tasks.
- registration of newly spawned actors with the discovery sys. - registration of newly spawned actors with the discovery sys.
Glossary:
--------
- tn: a `trio.Nursery` or "task nursery".
- an: an `ActorNursery` or "actor nursery".
- root: top/parent-most scope/task/process/actor (or other runtime
primitive) in a hierarchical tree.
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
- child-ish: "lower-down" in the runtime-primitive hierarchy.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
@ -85,7 +76,6 @@ from tractor.msg import (
) )
from .trionics import ( from .trionics import (
collapse_eg, collapse_eg,
maybe_open_nursery,
) )
from .ipc import ( from .ipc import (
Channel, Channel,
@ -183,11 +173,10 @@ class Actor:
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()` after fork
# - after fork for subactors. _root_n: Nursery|None = None
# - during boot for the root actor. _service_n: Nursery|None = None
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
_ipc_server: _server.IPCServer|None = None _ipc_server: _server.IPCServer|None = None
@property @property
@ -1021,48 +1010,12 @@ class Actor:
the RPC service nursery. the RPC service nursery.
''' '''
actor_repr: str = _pformat.nest_from_op( assert self._service_n
input_op='>c(', self._service_n.start_soon(
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
self.cancel, self.cancel,
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
# schedule a "canceller task" in the `._root_tn` once the
# `._service_tn` is fully shutdown; task waits for child-ish
# scopes to fully exit then finally cancels its parent,
# root-most, scope.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property @property
def cancel_complete(self) -> bool: def cancel_complete(self) -> bool:
return self._cancel_complete.is_set() return self._cancel_complete.is_set()
@ -1167,8 +1120,8 @@ class Actor:
await ipc_server.wait_for_shutdown() await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently # cancel all rpc tasks permanently
if self._service_tn: if self._service_n:
self._service_tn.cancel_scope.cancel() self._service_n.cancel_scope.cancel()
log_meth(msg) log_meth(msg)
self._cancel_complete.set() self._cancel_complete.set()
@ -1305,7 +1258,7 @@ class Actor:
''' '''
Cancel all ongoing RPC tasks owned/spawned for a given Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside `parent_chan: Channel` or simply all tasks (inside
`._service_tn`) when `parent_chan=None`. `._service_n`) when `parent_chan=None`.
''' '''
tasks: dict = self._rpc_tasks tasks: dict = self._rpc_tasks
@ -1517,55 +1470,46 @@ async def async_main(
accept_addrs.append(addr.unwrap()) accept_addrs.append(addr.unwrap())
assert accept_addrs assert accept_addrs
# The "root" nursery ensures the channel with the immediate
ya_root_tn: bool = bool(actor._root_tn) # parent is kept alive as a resilient service until
ya_service_tn: bool = bool(actor._service_tn) # cancellation steps have (mostly) occurred in
# a deterministic way.
# NOTE, a top-most "root" nursery in each actor-process
# enables a lifetime priority for the IPC-channel connection
# with a sub-actor's immediate parent. I.e. this connection
# is kept alive as a resilient service connection until all
# other machinery has exited, cancellation of all
# embedded/child scopes have completed. This helps ensure
# a deterministic (and thus "graceful")
# first-class-supervision style teardown where a parent actor
# (vs. say peers) is always the last to be contacted before
# disconnect.
root_tn: trio.Nursery root_tn: trio.Nursery
async with ( async with (
collapse_eg(), collapse_eg(),
maybe_open_nursery( trio.open_nursery() as root_tn,
nursery=actor._root_tn,
) as root_tn,
): ):
if ya_root_tn: actor._root_n = root_tn
assert root_tn is actor._root_tn assert actor._root_n
else:
actor._root_tn = root_tn
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
async with ( async with (
collapse_eg(), collapse_eg(),
maybe_open_nursery( trio.open_nursery() as service_nursery,
nursery=actor._service_tn,
) as service_tn,
_server.open_ipc_server( _server.open_ipc_server(
parent_tn=service_tn, # ?TODO, why can't this be the root-tn parent_tn=service_nursery,
stream_handler_tn=service_tn, stream_handler_tn=service_nursery,
) as ipc_server, ) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
): ):
if ya_service_tn:
assert service_tn is actor._service_tn
else:
# This nursery is used to handle all inbound # This nursery is used to handle all inbound
# connections to us such that if the TCP server # connections to us such that if the TCP server
# is killed, connections can continue to process # is killed, connections can continue to process
# in the background until this nursery is cancelled. # in the background until this nursery is cancelled.
actor._service_tn = service_tn actor._service_n = service_nursery
# set after allocate
actor._ipc_server = ipc_server actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules # load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent # XXX: do this **after** establishing a channel to the parent
@ -1591,11 +1535,10 @@ async def async_main(
# - root actor: the ``accept_addr`` passed to this method # - root actor: the ``accept_addr`` passed to this method
# TODO: why is this not with the root nursery? # TODO: why is this not with the root nursery?
# - see above that the `._service_tn` is what's used?
try: try:
eps: list = await ipc_server.listen_on( eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs, accept_addrs=accept_addrs,
stream_handler_nursery=service_tn, stream_handler_nursery=service_nursery,
) )
log.runtime( log.runtime(
f'Booted IPC server\n' f'Booted IPC server\n'
@ -1603,7 +1546,7 @@ async def async_main(
) )
assert ( assert (
(eps[0].listen_tn) (eps[0].listen_tn)
is not service_tn is not service_nursery
) )
except OSError as oserr: except OSError as oserr:
@ -1765,7 +1708,7 @@ async def async_main(
# XXX TODO but hard XXX # XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the # we can't actually do this bc the debugger uses the
# _service_tn to spawn the lock task, BUT, in theory if we had # _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be # the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way # actually possible to debug THIS machinery in the same way
# as user task code? # as user task code?

View File

@ -236,6 +236,10 @@ async def hard_kill(
# whilst also hacking on it XD # whilst also hacking on it XD
# terminate_after: int = 99999, # terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None: ) -> None:
''' '''
Un-gracefully terminate an OS level `trio.Process` after timeout. Un-gracefully terminate an OS level `trio.Process` after timeout.
@ -297,23 +301,6 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the # zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort. # removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art? # TODO: toss in the skynet-logo face as ascii art?
log.critical( log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -446,12 +446,12 @@ class ActorNursery:
@acm @acm
async def _open_and_supervise_one_cancels_all_nursery( async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor, actor: Actor,
hide_tb: bool = True, tb_hide: bool = False,
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default # normally don't need to show user by default
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = tb_hide
outer_err: BaseException|None = None outer_err: BaseException|None = None
inner_err: BaseException|None = None inner_err: BaseException|None = None
@ -643,9 +643,8 @@ _shutdown_msg: str = (
'Actor-runtime-shutdown' 'Actor-runtime-shutdown'
) )
@acm
# @api_frame # @api_frame
@acm
async def open_nursery( async def open_nursery(
*, # named params only! *, # named params only!
hide_tb: bool = True, hide_tb: bool = True,

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try: try:
import stackscope import stackscope
except ImportError: except ImportError:
log.warning( log.error(
'The `stackscope` lib is not installed!\n' '`stackscope` not installed for use in debug mode!\n'
'`Ignoring enable_stack_on_sig() call!\n' '`Ignoring {enable_stack_on_sig!r} call!\n'
) )
return None return None

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*, *,
tb: TracebackType|None = None, tb: TracebackType|None = None,
api_frame: FrameType|None = None, api_frame: FrameType|None = None,
hide_tb: bool = True, hide_tb: bool = False,
# only enter debugger REPL when returns `True` # only enter debugger REPL when returns `True`
debug_filter: Callable[ debug_filter: Callable[

View File

@ -58,7 +58,6 @@ from tractor._context import Context
from tractor import _state from tractor import _state
from tractor._exceptions import ( from tractor._exceptions import (
NoRuntime, NoRuntime,
InternalError,
) )
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
@ -80,9 +79,6 @@ from ._sigint import (
sigint_shield as sigint_shield, sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
@ -481,12 +477,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery # we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below: # cancel on this task start? I *think* this works below:
# ```python # ```python
# actor._service_tn.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
# ``` # ```
# but not entirely sure if that's a sane way to implement it? # but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this # NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_tn` so that the # subactor's global `Actor._service_n` so that the
# lifetime of the lock-request can outlive the current # lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their # `._pause()` scope while the user steps through their
# application code and when they finally exit the # application code and when they finally exit the
@ -510,7 +506,7 @@ async def _pause(
f'|_{task}\n' f'|_{task}\n'
) )
with trio.CancelScope(shield=shield): with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_tn.start( req_ctx: Context = await actor._service_n.start(
partial( partial(
request_root_stdio_lock, request_root_stdio_lock,
actor_uid=actor.uid, actor_uid=actor.uid,
@ -544,7 +540,7 @@ async def _pause(
_repl_fail_report = None _repl_fail_report = None
# when the actor is mid-runtime cancellation the # when the actor is mid-runtime cancellation the
# `Actor._service_tn` might get closed before we can spawn # `Actor._service_n` might get closed before we can spawn
# the request task, so just ignore expected RTE. # the request task, so just ignore expected RTE.
elif ( elif (
isinstance(pause_err, RuntimeError) isinstance(pause_err, RuntimeError)
@ -989,7 +985,7 @@ def pause_from_sync(
# that output and assign the `repl` created above! # that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run( bg_task, _ = trio.from_thread.run(
afn=partial( afn=partial(
actor._service_tn.start, actor._service_n.start,
partial( partial(
_pause_from_bg_root_thread, _pause_from_bg_root_thread,
behalf_of_thread=thread, behalf_of_thread=thread,
@ -1157,10 +1153,9 @@ def pause_from_sync(
'use_greenback', 'use_greenback',
False, False,
): ):
raise InternalError( raise RuntimeError(
f'`greenback` was never initialized in this actor?\n' '`greenback` was never initialized in this actor!?\n\n'
f'\n' f'{_state._runtime_vars}\n'
f'{ppfmt(_state._runtime_vars)}\n'
) from rte ) from rte
raise raise

View File

@ -17,38 +17,20 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
import platform
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
# a flag,
# - [ ] soo if it works like this, drop this module entirely for
# 3.13+ B)
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
#
def disable_mantracker(): def disable_mantracker():
''' '''
Disable all `multiprocessing` "resource tracking" machinery since Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness. it's an absolute multi-threaded mess of non-SC madness.
''' '''
from multiprocessing.shared_memory import SharedMemory from multiprocessing import resource_tracker as mantracker
# 3.13+ only.. can pass `track=False` to disable
# all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
# !TODO, once we drop 3.12- we can obvi remove all this!
else:
from multiprocessing import (
resource_tracker as mantracker,
)
# Tell the "resource tracker" thing to fuck off. # Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker): class ManTracker(mantracker.ResourceTracker):
@ -68,8 +50,3 @@ def disable_mantracker():
mantracker.ensure_running = mantracker._resource_tracker.ensure_running mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT

View File

@ -1001,11 +1001,7 @@ class Server(Struct):
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
server=self, server=self,
stream_handler_tn=( stream_handler_tn=stream_handler_nursery,
stream_handler_nursery
or
self._stream_handler_tn
),
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
@ -1149,17 +1145,13 @@ async def open_ipc_server(
async with maybe_open_nursery( async with maybe_open_nursery(
nursery=parent_tn, nursery=parent_tn,
) as parent_tn: ) as rent_tn:
no_more_peers = trio.Event() no_more_peers = trio.Event()
no_more_peers.set() no_more_peers.set()
ipc_server = IPCServer( ipc_server = IPCServer(
_parent_tn=parent_tn, _parent_tn=rent_tn,
_stream_handler_tn=( _stream_handler_tn=stream_handler_tn or rent_tn,
stream_handler_tn
or
parent_tn
),
_no_more_peers=no_more_peers, _no_more_peers=no_more_peers,
) )
try: try:

View File

@ -23,15 +23,14 @@ considered optional within the context of this runtime-library.
""" """
from __future__ import annotations from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder from sys import byteorder
import time import time
from typing import Optional from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import ( from msgspec import (
Struct, Struct,
@ -62,7 +61,7 @@ except ImportError:
log = get_logger(__name__) log = get_logger(__name__)
SharedMemory = disable_mantracker() disable_mantracker()
class SharedInt: class SharedInt:
@ -798,14 +797,7 @@ def open_shm_list(
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close) actor.lifetime_stack.callback(shml.shm.close)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink) actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError: except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps') log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,25 +430,20 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError, ) as bre:
) as _re: trans_err = bre
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
match trans_err: match trans_err:
case trio.BrokenResourceError() if (
# XXX, specifc to UDS transport and its, '[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD # well, "speediness".. XD
# |_ likely todo with races related to how fast # |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux # the socket is setup/torn-down on linux
# as it pertains to rando pings from the # as it pertains to rando pings from the
# `.discovery` subsys and protos. # `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
): ):
tpt_closed = TransportClosed.from_src_exc( raise TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
@ -456,31 +451,14 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) ) from bre
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn # normal operation breakage" we usualy console warn
# about it. # about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' '{tpt_name} layer failed pre-send ??\n'
) )
raise trans_err raise trans_err
@ -525,7 +503,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str: def pformat(self) -> str:
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f' |_peers: 1\n' f' |_peers: 2\n'
f' laddr: {self._laddr}\n' f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n' f' raddr: {self._raddr}\n'
# f'\n' # f'\n'

View File

@ -613,9 +613,10 @@ async def drain_to_final_msg(
# msg: dict = await ctx._rx_chan.receive() # msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught: # if res_cs.cancelled_caught:
# #
# -[x] make sure pause points work here for REPLing # -[ ] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs! # the runtime itself; i.e. ensure there's no hangs!
# |_see masked code below in .cancel_called path # |_from tractor.devx.debug import pause
# await pause()
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
@ -651,10 +652,6 @@ async def drain_to_final_msg(
f'IPC ctx cancelled externally during result drain ?\n' f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}' f'{ctx}'
) )
# XXX, for tracing `Cancelled`..
# from tractor.devx.debug import pause
# await pause(shield=True)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's

View File

@ -215,7 +215,7 @@ class LinkedTaskChannel(
val: Any = None, val: Any = None,
) -> None: ) -> None:
''' '''
Synchronize aio-side with its trio-parent. Synchronize aio-sde with its trio-parent.
''' '''
self._aio_started_val = val self._aio_started_val = val
@ -459,22 +459,14 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n' f'Task exited with final result: {result!r}\n'
) )
# XXX ALWAYS close the child-`asyncio`-task-side's # only close the aio (child) side which will relay
# `to_trio` handle which will in turn relay # a `trio.EndOfChannel` to the trio (parent) side.
# a `trio.EndOfChannel` to the `trio`-parent.
# Consequently the parent `trio` task MUST ALWAYS
# check for any `chan._aio_err` to be raised when it
# receives an EoC.
#
# NOTE, there are 2 EoC cases,
# - normal/graceful EoC due to the aio-side actually
# terminating its "streaming", but the task did not
# error and is not yet complete.
#
# - the aio-task terminated and we specially mark the
# closure as due to the `asyncio.Task`'s exit.
# #
# XXX NOTE, that trio-side MUST then in such cases
# check for a `chan._aio_err` and raise it!!
to_trio.close() to_trio.close()
# specially mark the closure as due to the
# asyncio.Task terminating!
chan._closed_by_aio_task = True chan._closed_by_aio_task = True
aio_task_complete.set() aio_task_complete.set()
@ -854,6 +846,8 @@ async def translate_aio_errors(
chan._trio_to_raise = aio_err chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc trio_err = chan._trio_err = eoc
# #
# await tractor.pause(shield=True)
#
# ?TODO?, raise something like a, # ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored() # chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying # BUT, with the tb rewritten to reflect the underlying

View File

@ -78,6 +78,7 @@ def collapse_exception_group(
def get_collapsed_eg( def get_collapsed_eg(
beg: BaseExceptionGroup, beg: BaseExceptionGroup,
bp: bool = False,
) -> BaseException|None: ) -> BaseException|None:
''' '''
If the input beg can collapse to a single sub-exception which is If the input beg can collapse to a single sub-exception which is
@ -91,6 +92,7 @@ def get_collapsed_eg(
return maybe_exc return maybe_exc
@acm @acm
async def collapse_eg( async def collapse_eg(
hide_tb: bool = True, hide_tb: bool = True,
@ -100,8 +102,6 @@ async def collapse_eg(
# trio.Cancelled, # trio.Cancelled,
}, },
add_notes: bool = True, add_notes: bool = True,
bp: bool = False,
): ):
''' '''
If `BaseExceptionGroup` raised in the body scope is If `BaseExceptionGroup` raised in the body scope is
@ -115,20 +115,6 @@ async def collapse_eg(
yield yield
except BaseExceptionGroup as _beg: except BaseExceptionGroup as _beg:
beg = _beg beg = _beg
if (
bp
and
len(beg.exceptions) > 1
):
import tractor
if tractor.current_actor(
err_on_no_runtime=False,
):
await tractor.pause(shield=True)
else:
breakpoint()
if ( if (
(exc := get_collapsed_eg(beg)) (exc := get_collapsed_eg(beg))
and and

View File

@ -31,6 +31,7 @@ from typing import (
AsyncIterator, AsyncIterator,
Callable, Callable,
Hashable, Hashable,
Optional,
Sequence, Sequence,
TypeVar, TypeVar,
TYPE_CHECKING, TYPE_CHECKING,
@ -203,7 +204,7 @@ class _Cache:
a kept-alive-while-in-use async resource. a kept-alive-while-in-use async resource.
''' '''
service_tn: trio.Nursery|None = None service_n: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {} locks: dict[Hashable, trio.Lock] = {}
users: int = 0 users: int = 0
values: dict[Any, Any] = {} values: dict[Any, Any] = {}
@ -212,7 +213,7 @@ class _Cache:
tuple[trio.Nursery, trio.Event] tuple[trio.Nursery, trio.Event]
] = {} ] = {}
# nurseries: dict[int, trio.Nursery] = {} # nurseries: dict[int, trio.Nursery] = {}
no_more_users: trio.Event|None = None no_more_users: Optional[trio.Event] = None
@classmethod @classmethod
async def run_ctx( async def run_ctx(
@ -222,17 +223,15 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
try:
async with mng as value: async with mng as value:
_, no_more_users = cls.resources[ctx_key] _, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value cls.values[ctx_key] = value
task_status.started(value) task_status.started(value)
try:
await no_more_users.wait() await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally: finally:
# discard nursery ref so it won't be re-used (an error)? # discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key) cls.resources.pop(ctx_key)
@ -295,15 +294,15 @@ async def maybe_open_context(
f'task: {task}\n' f'task: {task}\n'
f'task_tn: {task_tn}\n' f'task_tn: {task_tn}\n'
) )
service_tn = tn service_n = tn
else: else:
service_tn: trio.Nursery = current_actor()._service_tn service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate # TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery? # a 'stays-open-till-last-task-finshed nursery?
# service_tn: trio.Nursery # service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn: # async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_tn = service_tn # _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None maybe_taskc: trio.Cancelled|None = None
@ -325,8 +324,8 @@ async def maybe_open_context(
mngr = acm_func(**kwargs) mngr = acm_func(**kwargs)
resources = _Cache.resources resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_tn, trio.Event()) resources[ctx_key] = (service_n, trio.Event())
yielded: Any = await service_tn.start( yielded: Any = await service_n.start(
_Cache.run_ctx, _Cache.run_ctx,
mngr, mngr,
ctx_key, ctx_key,

View File

@ -22,10 +22,6 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
import inspect
from types import (
TracebackType,
)
from typing import ( from typing import (
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
@ -67,66 +63,6 @@ def find_masked_excs(
return None return None
_mask_cases: dict[
Type[Exception], # masked exc type
dict[
int, # inner-frame index into `inspect.getinnerframes()`
# `FrameInfo.function/filename: str`s to match
dict[str, str],
],
] = {
trio.WouldBlock: {
# `trio.Lock.acquire()` has a checkpoint inside the
# `WouldBlock`-no_wait path's handler..
-5: { # "5th frame up" from checkpoint
'filename': 'trio/_sync.py',
'function': 'acquire',
# 'lineno': 605, # matters?
},
}
}
def is_expected_masking_case(
cases: dict,
exc_ctx: Exception,
exc_match: BaseException,
) -> bool|inspect.FrameInfo:
'''
Determine whether the provided masked exception is from a known
bug/special/unintentional-`trio`-impl case which we do not wish
to unmask.
Return any guilty `inspect.FrameInfo` ow `False`.
'''
exc_tb: TracebackType = exc_match.__traceback__
if cases := _mask_cases.get(type(exc_ctx)):
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
for iframe, matchon in cases.items():
try:
masker_frame: inspect.FrameInfo = inner[iframe]
except IndexError:
continue
for field, in_field in matchon.items():
val = getattr(
masker_frame,
field,
)
if in_field not in val:
break
else:
return masker_frame
return False
# XXX, relevant discussion @ `trio`-core, # XXX, relevant discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455 # https://github.com/python-trio/trio/issues/455
# #
@ -163,7 +99,6 @@ async def maybe_raise_from_masking_exc(
# ^XXX, special case(s) where we warn-log bc likely # ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc # there will be no operational diff since the exc
# is always expected to be consumed. # is always expected to be consumed.
) -> BoxedMaybeException: ) -> BoxedMaybeException:
''' '''
Maybe un-mask and re-raise exception(s) suppressed by a known Maybe un-mask and re-raise exception(s) suppressed by a known
@ -262,27 +197,6 @@ async def maybe_raise_from_masking_exc(
raise_unmasked raise_unmasked
): ):
if len(masked) < 2: if len(masked) < 2:
# don't unmask already known "special" cases..
if (
_mask_cases
and
(cases := _mask_cases.get(type(exc_ctx)))
and
(masker_frame := is_expected_masking_case(
cases,
exc_ctx,
exc_match,
))
):
log.warning(
f'Ignoring already-known, non-ideal-but-valid '
f'masker code @\n'
f'{masker_frame}\n'
f'\n'
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
)
raise exc_match
raise exc_ctx from exc_match raise exc_ctx from exc_match
# ??TODO, see above but, possibly unmasking sub-exc # ??TODO, see above but, possibly unmasking sub-exc