Compare commits

...

41 Commits

Author SHA1 Message Date
Tyler Goodlet 653f23a04c Bump "task-manager(-nursery)" naming, add logging
Namely just renaming any `trio.Nursery` instances to `tn`, the primary
`@acm`-API to `.trionics.open_taskman()` and change out all `print()`s
for logger instances with 'info' level enabled by the mod-script usage.
2025-09-12 12:35:27 -04:00
Tyler Goodlet 90db6f2299 Add a new `.trionics._tn` for "task nursery stuff"
Since I'd like to decouple the new "task-manager-nursery" lowlevel
primitives/abstractions from the higher-level
`TaskManagerNursery`-supporting API(s) and default per-task
supervision-strat and because `._mngr` is already purposed for
higher-level "on-top-of-nursery" patterns as it is.

Deats,
- move `maybe_open_nursery()` into the new mod.
- adjust the pkg-mod's import to the new sub-mod.
- also draft up this idea for an API which stacks `._beg.collapse_eg()`
  onto a nursery with the WIP name `open_loose_tn()` but more then
  likely i'll just discard this idea bc i think the explicit `@acm`
  stacking is more explicit/pythonic/up-front-grokable despite the extra
  LoC.
2025-09-12 12:35:27 -04:00
Tyler Goodlet b2d63bc102 Add `debug_mode: bool` control to task mngr
Allows dynamically importing `pdbp` when enabled and a way for
eventually linking with `tractor`'s own debug mode flag.
2025-09-12 12:35:27 -04:00
Tyler Goodlet d433606a6b Go all in on "task manager" naming 2025-09-12 12:35:27 -04:00
Tyler Goodlet cb9569eace More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
2025-09-12 12:35:27 -04:00
Tyler Goodlet 9798fcd3bb Ensure user-allocated cancel scope just works!
Turns out the nursery doesn't have to care about allocating a per task
`CancelScope` since the user can just do that in the
`@task_scope_manager` if desired B) So just mask all the nursery cs
allocating with the intention of removal.

Also add a test for per-task-cancellation by starting the crash task as
a `trio.sleep_forever()` but then cancel it via the user allocated cs
and ensure the crash propagates as expected 💥
2025-09-12 12:35:27 -04:00
Tyler Goodlet 03549c51ab Facepalm, don't pass in unecessary cancel scope 2025-09-12 12:35:27 -04:00
Tyler Goodlet 256016c515 Do renaming, implement lowlevel `Outcome` sending
As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
2025-09-12 12:35:26 -04:00
Tyler Goodlet 17b2a2cab4 Alias to `@acm` in broadcaster mod 2025-09-12 12:35:26 -04:00
Tyler Goodlet 4fbafe7ca4 Initial prototype for a one-cancels-one style supervisor, nursery thing.. 2025-09-12 12:35:26 -04:00
Tyler Goodlet 7ffdf3483a Use shorthand nursery var-names per convention in codebase 2025-09-12 12:28:04 -04:00
Tyler Goodlet 76ed0f2ef6 Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2025-09-12 12:28:04 -04:00
Tyler Goodlet 2afb624c48 Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2025-09-12 12:28:04 -04:00
Tyler Goodlet 885137ac19 Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2025-09-12 12:28:04 -04:00
Bd 83ce2275b9
Merge pull request #399 from goodboy/oob_cancel_testing
OoB (out-of-band) cancellation testing, proper.
2025-09-11 14:33:52 -04:00
Tyler Goodlet 9f757ffa63 Woops, fix missing `assert` thanks to copilot 2025-09-11 13:13:18 -04:00
Tyler Goodlet 0c6d512ba4 Solve another OoB cancellation case, the bg task one
Such that we are able to (finally) detect when we should
`Context._scope.cancel()` specifically when the `.parent_task` is
**not** blocking on receiving from the underlying `._rx_chan`, since if
the task is blocking on `.receive()` it will call `.cancel()`
implicitly.

This is a lot to explain with very little code actually needed for the
implementation (are we like `trio` yet anyone?? XD) but the main jist is
that `Context._maybe_cancel_and_set_remote_error()` needed the
additional case of calling `._scope.cancel()` whenever we know that
a remote-error/ctxc won't be immediately handled, bc user code is doing
non `Context`-API things, and result in a similar outcome as if that
task was waiting on `Context.wait_for_result()` or `.__aexite__()`.

Impl details,
- add a new `._is_blocked_on_rx_chan()` method which predicates whether
  the (new) `.parent_task` is blocking on `._rx_chan.receive()`.
  * see various stipulations about the current impl and how we might
    need to adjust for the future given `trio`'s commitment to the
    `Task.custom_sleep_data` attr..
- add `.parent_task`, a pub wrapper for `._task`.
- check for `not ._is_blocked_on_rx_chan()` before manually cancelling
  the local `.parent_task`
- minimize the surrounding branch case expressions.

Other,
- tweak a couple logs.
- add a new `.cancel()` pre-started msg.
- mask the `.cancel_called` setter, it's only (been) used for tracing.
- todos around maybe moving the `._nursery` allocation "around" the
  `.start_remote_task()` call and various subsequent tweaks therein.
2025-09-11 13:12:52 -04:00
Tyler Goodlet fc130d06b8 Check off REPL-ing todo add masked usage in `drain_to_final_msg()` 2025-09-11 10:13:04 -04:00
Tyler Goodlet 73423ef2b7 Timeout on `test_peer_spawns_and_cancels_service_subactor`
While working on a fix to the hang case found from
`test_cancel_ctx_with_parent_side_entered_in_bg_task` an initial
solution caused this test to hang indefinitely; solve it with a small
wrapping `_main()` + `trio.fail_after()` entrypoint.

Further suite refinements,
- move the top-most `try:`->`else:` block
- toss in a masked base-exc block for tracing unexpected
  `ctx.wait_for_result()` outcomes.
- tweak the `raise_sub_spawn_error_after` to be an optional `float`
  which scales the `rng_seed: int = 50` msg counter to
  `tell_little_bro()` so that the abs value to the `range()` can be
  changed.
2025-09-11 10:13:04 -04:00
Tyler Goodlet b1f2a6b394 Rename var for and hide the `_open_and_supervise_one_cancels_all_nursery` frame 2025-09-11 10:13:04 -04:00
Tyler Goodlet 9489a2f84d Add timeout around `test_peer_spawns_and_cancels_service_subactor` suite 2025-09-11 10:13:04 -04:00
Tyler Goodlet 92eaed6fec Parametrize with `Portal.cancel_actor()` only case
Such that when `maybe_context.cancel()` is not called (explicitly) and
only the subactor is cancelled by its parent we expect to see a ctxc
raised both from any call to `Context.wait_for_result()` and out of
the `Portal.open_context()` scope, up to the `trio.run()`.

Deats,
- obvi call-n-catch the ctxc (in scope) for the oob-only
  subactor-cancelled case.
- add branches around `trio.run()` entry to match.
2025-09-11 10:13:04 -04:00
Tyler Goodlet 217d54b9d1 Add the minimal OoB cancel edge case from #391
Discovered while writing a `@context` sanity test to verify unmasker
ignore-cases support. Masked code is due to the process of finding the
minimal example causing the original hang discovered in the original
examples script. Details are in the test-fn doc strings and surrounding
comments; more refinement and cleanup coming obviously.

Also moved over the self-cancel todos from the inter-peer tests module.
2025-09-11 10:13:04 -04:00
Bd 34ca02ed11
Merge pull request #391 from goodboy/cancelled_masking_guards
A refined `trio.Cancelled`-unmasking helper
2025-09-11 10:10:41 -04:00
Tyler Goodlet 62a364a1d3 Tweaks from copilot, type fix, typos, language. 2025-09-11 10:01:25 -04:00
Tyler Goodlet 07781e38cd Reduce "ignore cases" script to `trio`-only
Remove all the `tractor` usage (with IPC ctxs) and just get us
a min-reproducing-example with a multi-task-single `trio.Lock`.
The wrapping test suite runs the exact same with an ignore case and
an `.xfail()` for when we let the `trio.WouldBlock` be unmasked.
2025-09-07 18:49:21 -04:00
Tyler Goodlet 9c6b90ef04 Add a ignore-masking-case script + suite
Demonstrating the guilty `trio.Lock.acquire()` impl which puts
a checkpoint inside its `trio.WouldBlock` handler and which will always
appear to mask the "sync path" case on (graceful) cancellation.

This first script draft demos the issue from within a `tractor.context`
ep bc that's where it was orig discovered, however i'm going to factor
out the `tractor` code and instead just use
a `.trionics.maybe_raise_from_masking_exc()` to demo its low-level
ignore-case feature.

Further, this script exposed a previously unhandled remote graceful
cancellation case which hangs:

- parent actor spawns child and opens a >1 ctxs with it,
- the parent then OoB (out-of-band) cancels the child actor (with
  `Portal.cancel_actor()`),
- since the open ctxs raise a ctxc with a `.canceller == parent.uid` the
  `Context._is_self_cancelled()` will eval `True`,
- the `Context._scope` will NOT be cancelled in
  `._maybe_cancel_and_set_remote_error()` resulting in any bg-task which
  is waiting on a `Portal.open_context()` to not be cancelled/unblocked.

So my plan is to factor this ^^ scenario into a standalone unit test
as well as another test which consumes from al low-level `trio`-only
version of **this** script-scenario to sanity check the interaction
of the unmasker-with-ignore-cases usage implicitly around a ctx ep.
2025-09-06 14:03:02 -04:00
Tyler Goodlet 542d4c7840 Ignore `examples/trio/` in docs-examples test suite 2025-09-06 13:39:08 -04:00
Tyler Goodlet 9aebe7d8f9 Only read `_mask_cases` if truthy, allow disabling for xfails 2025-09-05 22:23:51 -04:00
Tyler Goodlet 04c3d5e239 Wrap `send_chan_aclose_masks_beg.py` as test suite
Call it `test_trioisms::test_unmask_aclose_as_checkpoint_on_aexit` and
parametrize all script-mod`.main()` toggles including `.xfails()` for
the `raise_unmasked=False` cases.
2025-09-05 18:46:20 -04:00
Tyler Goodlet 759174729c Prep masking `.aclose()` script for test suite
So we can parametrize in various toggles to `main()` including,
- `child_errors_mid_stream: bool` which now also drives whether an
  additional, and otherwise non-affecting, `_tn` is allocated in
  the `finite_stream_to_rent()` subtask, only in the early stream
  termination case does it seem to produce a masked outcome?
  * see surrounding notes within.
- `raise_unmasked: bool` to toggle whether the embedded unmasker fn
  will actually raise the masked user RTE; this enables demoing the
  masked outcomes via simple switch and makes it easy to wrap them
  as `pytest.xfail()` outcomes.

Also in support,
- use `.trionics.collapse_eg()` around the root tn to ensure when
  unmasking we can catch the EG-unwrapped RTE easily from a test.
- flip stream `msg` logs to `.debug()` to reduce console noise.
- tweak mod's script iface to report/trace unexpected non-RTEs.
2025-09-05 18:46:20 -04:00
Tyler Goodlet e9f3689191 Add "ignore-case-handling" to exc unmasker
Since it turns out there's even case(s) in `trio` core that are guilty
(of implementing things like checkpoints in exc handlers), this adds
facility for ignoring explicit cases via `inspect.FrameInfo` field
matching from the unmasked `exc_ctx` within
`maybe_raise_from_masking_exc()`.

Impl deats,
- use `inspect.getinnerframes()/getmodule()` to extract the equivalent
  "guilty place in code" which raised the masked error which we'd like
  to ignore and **not unmask**.
- start a `_mask_cases: dict` which describes the entries to ignore
  by matching against a specific `FrameInfo`'s fields from indexed
  from `getinnerframes()`.
- describe in that table the case i hit with `trio.WouldBlock` being
  always masked by a `Cancelled` due to way `trio.Lock.acquire()`
  implements the blocking case in the would-block handler..
- always call into a new `is_expected_masking_case()` predicate (from
  `maybe_raise_from_masking_exc()`) on matching `exc_ctx` types.
2025-09-05 14:54:54 -04:00
Tyler Goodlet 93aa39db07 Always pop `._Cache.resources` AFTER `mng.__aexit__()`
The correct ordering is to de-alloc the surrounding `service_n`
+ `trio.Event` **after** the `mng` teardown ensuring the
`mng.__aexit__()` never can hit a ref-error if it touches either (like
if a `tn` is passed to `maybe_open_context()`!
2025-09-05 14:54:41 -04:00
Tyler Goodlet 5ab642bdf0 Drop more `typing.Optional` usage 2025-08-20 12:45:49 -04:00
Tyler Goodlet ed18ecd064 Drop `tn` arg to `maybe_raise_from_masking_exc()` in `._rpc` 2025-08-20 12:45:49 -04:00
Tyler Goodlet cec0282953 Add `never_warn_on: dict` support to unmasker
Such that key->value pairs can be defined which should *never be*
unmasked where values of
- the keys are exc-types which might be masked, and
- the values are exc-types which masked the equivalent key.

For example, the default includes:
- KBI->taskc: a kbi should never be unmasked from its masking
  `trio.Cancelled`.

For the impl, a new `do_warn: bool` in the fn-body determines the
primary guard for whether a warning or re-raising is necessary.
2025-08-20 12:45:49 -04:00
Tyler Goodlet 25c5847f2e Drop `tn` input from `maybe_raise_from_masking_exc()`
Including all caller usage throughout. Moving to a non-`except*` impl
means it's never needed as a signal from the caller - we can just catch
the beg outright (like we should have always been doing)..
2025-08-20 12:45:49 -04:00
Tyler Goodlet ba793fadd9 Pass `tuple` from `._invoke()` unmasker usage
To match the `maybe_raise_from_masking_exc()` sig change.
2025-08-20 12:45:49 -04:00
Tyler Goodlet d17864a432 Adjust test suites to new `maybe_raise_from_masking_exc()` changes 2025-08-20 12:45:49 -04:00
Tyler Goodlet 6c361a9564 Drop `except*` usage from `._taskc` unmasker
That is from `maybe_raise_from_masking_exc()` thus minimizing us to
a single `except BaseException` block with logic branching for the beg
vs. `unmask_from` exc cases.

Also,
- raise val-err when `unmask_from` is not a `tuple`.
- tweak the exc-note warning format.
- drop all pausing from dev work.
2025-08-20 12:45:49 -04:00
Tyler Goodlet 34ca7429c7 Add a "real-world" example of cancelled-masking with `.aclose()` 2025-08-20 12:45:49 -04:00
18 changed files with 2066 additions and 191 deletions

View File

@ -0,0 +1,85 @@
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
_lock: trio.Lock|None = None
@acm
async def acquire_singleton_lock(
) -> None:
global _lock
if _lock is None:
log.info('Allocating LOCK')
_lock = trio.Lock()
log.info('TRYING TO LOCK ACQUIRE')
async with _lock:
log.info('ACQUIRED')
yield _lock
log.info('RELEASED')
async def hold_lock_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
acquire_singleton_lock() as lock,
):
task_status.started(lock)
await trio.sleep_forever()
async def main(
ignore_special_cases: bool,
loglevel: str = 'info',
debug_mode: bool = True,
):
async with (
trio.open_nursery() as tn,
# tractor.trionics.maybe_raise_from_masking_exc()
# ^^^ XXX NOTE, interestingly putting the unmasker
# here does not exhibit the same behaviour ??
):
if not ignore_special_cases:
from tractor.trionics import _taskc
_taskc._mask_cases.clear()
_lock = await tn.start(
hold_lock_forever,
)
with trio.move_on_after(0.2):
await tn.start(
hold_lock_forever,
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'ignore_special_cases: {case!r}\n'
)
trio.run(partial(
main,
ignore_special_cases=case,
loglevel='info',
))

View File

@ -0,0 +1,195 @@
from contextlib import (
contextmanager as cm,
# TODO, any diff in async case(s)??
# asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
@cm
def teardown_on_exc(
raise_from_handler: bool = False,
):
'''
You could also have a teardown handler which catches any exc and
does some required teardown. In this case the problem is
compounded UNLESS you ensure the handler's scope is OUTSIDE the
`ux.aclose()`.. that is in the caller's enclosing scope.
'''
try:
yield
except BaseException as _berr:
berr = _berr
log.exception(
f'Handling termination teardown in child due to,\n'
f'{berr!r}\n'
)
if raise_from_handler:
# XXX teardown ops XXX
# on termination these steps say need to be run to
# ensure wider system consistency (like the state of
# remote connections/services).
#
# HOWEVER, any bug in this teardown code is also
# masked by the `tx.aclose()`!
# this is also true if `_tn.cancel_scope` is
# `.cancel_called` by the parent in a graceful
# request case..
# simulate a bug in teardown handler.
raise RuntimeError(
'woopsie teardown bug!'
)
raise # no teardown bug.
async def finite_stream_to_rent(
tx: trio.abc.SendChannel,
child_errors_mid_stream: bool,
raise_unmasked: bool,
task_status: trio.TaskStatus[
trio.CancelScope,
] = trio.TASK_STATUS_IGNORED,
):
async with (
# XXX without this unmasker the mid-streaming RTE is never
# reported since it is masked by the `tx.aclose()`
# call which in turn raises `Cancelled`!
#
# NOTE, this is WITHOUT doing any exception handling
# inside the child task!
#
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
tractor.trionics.maybe_raise_from_masking_exc(
raise_unmasked=raise_unmasked,
),
tx as tx, # .aclose() is the guilty masker chkpt!
# XXX, this ONLY matters in the
# `child_errors_mid_stream=False` case oddly!?
# THAT IS, if no tn is opened in that case then the
# test will not fail; it raises the RTE correctly?
#
# -> so it seems this new scope somehow affects the form of
# eventual in the parent EG?
tractor.trionics.maybe_open_nursery(
nursery=(
None
if not child_errors_mid_stream
else True
),
) as _tn,
):
# pass our scope back to parent for supervision\
# control.
cs: trio.CancelScope|None = (
None
if _tn is True
else _tn.cancel_scope
)
task_status.started(cs)
with teardown_on_exc(
raise_from_handler=not child_errors_mid_stream,
):
for i in range(100):
log.debug(
f'Child tx {i!r}\n'
)
if (
child_errors_mid_stream
and
i == 66
):
# oh wait but WOOPS there's a bug
# in that teardown code!?
raise RuntimeError(
'woopsie, a mid-streaming bug!?'
)
await tx.send(i)
async def main(
# TODO! toggle this for the 2 cases!
# 1. child errors mid-stream while parent is also requesting
# (graceful) cancel of that child streamer.
#
# 2. child contains a teardown handler which contains a
# bug and raises.
#
child_errors_mid_stream: bool,
raise_unmasked: bool = False,
loglevel: str = 'info',
):
tractor.log.get_console_log(level=loglevel)
# the `.aclose()` being checkpoints on these
# is the source of the problem..
tx, rx = trio.open_memory_channel(1)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
rx as rx,
):
_child_cs = await tn.start(
partial(
finite_stream_to_rent,
child_errors_mid_stream=child_errors_mid_stream,
raise_unmasked=raise_unmasked,
tx=tx,
)
)
async for msg in rx:
log.debug(
f'Rent rx {msg!r}\n'
)
# simulate some external cancellation
# request **JUST BEFORE** the child errors.
if msg == 65:
log.cancel(
f'Cancelling parent on,\n'
f'msg={msg}\n'
f'\n'
f'Simulates OOB cancel request!\n'
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'child_errors_midstream: {case!r}\n'
)
try:
trio.run(partial(
main,
child_errors_mid_stream=case,
# raise_unmasked=True,
loglevel='info',
))
except Exception as _exc:
exc = _exc
log.exception(
'Should have raised an RTE or Cancelled?\n'
)
breakpoint()

View File

@ -95,6 +95,7 @@ def run_example_in_subproc(
and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
and 'multihost' not in p[0]
and 'trio' not in p[0]
)
],
ids=lambda t: t[1],

View File

@ -24,14 +24,10 @@ from tractor._testing import (
)
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
@ -44,16 +40,6 @@ from tractor._testing import (
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context
async def open_stream_then_sleep_forever(
ctx: Context,
@ -806,7 +792,7 @@ async def basic_echo_server(
ctx: Context,
peer_name: str = 'wittle_bruv',
err_after: int|None = None,
err_after_imsg: int|None = None,
) -> None:
'''
@ -835,8 +821,9 @@ async def basic_echo_server(
await ipc.send(resp)
if (
err_after
and i > err_after
err_after_imsg
and
i > err_after_imsg
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
@ -978,7 +965,8 @@ async def tell_little_bro(
actor_name: str,
caller: str = '',
err_after: int|None = None,
err_after: float|None = None,
rng_seed: int = 50,
):
# contact target actor, do a stream dialog.
async with (
@ -989,14 +977,18 @@ async def tell_little_bro(
basic_echo_server,
# XXX proxy any delayed err condition
err_after=err_after,
err_after_imsg=(
err_after * rng_seed
if err_after is not None
else None
),
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
uid: tuple = actor.uid
for i in range(100):
for i in range(rng_seed):
msg: tuple = (
uid,
i,
@ -1021,13 +1013,13 @@ async def tell_little_bro(
)
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 50],
[None, 0.5],
)
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: int|None,
raise_sub_spawn_error_after: float|None,
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
@ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor(
)
try:
res = await client_ctx.result(hide_tb=False)
res = await client_ctx.wait_for_result(hide_tb=False)
# in remote (relayed inception) error
# case, we should error on the line above!
if raise_sub_spawn_error_after:
@ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor(
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# XXX, only for tracing
# except BaseException as _berr:
# berr = _berr
# await tractor.pause(shield=True)
# raise berr
except RemoteActorError as rae:
_err = rae
@ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor(
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# await tractor.pause()
# await server.cancel_actor()
except RemoteActorError as rae:
@ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext.
# called directly for this confext.
except ContextCancelled as ctxc:
_ctxc = ctxc
print(
@ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor(
# assert spawn_ctx.cancelled_caught
async def _main():
with trio.fail_after(
3 if not debug_mode
else 999
):
await main()
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
trio.run(_main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(main)
trio.run(_main)

View File

@ -0,0 +1,239 @@
'''
Define the details of inter-actor "out-of-band" (OoB) cancel
semantics, that is how cancellation works when a cancel request comes
from the different concurrency (primitive's) "layer" then where the
eventual `trio.Task` actually raises a signal.
'''
from functools import partial
# from contextlib import asynccontextmanager as acm
# import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
ActorNursery,
Portal,
Context,
# ContextCancelled,
# RemoteActorError,
)
# from tractor._testing import (
# tractor_test,
# expect_ctxc,
# )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
#
# things to ensure!
# -[ ] the ctxc raised in a child should ideally show the tb of the
# underlying `Cancelled` checkpoint, i.e.
# `raise scope_error from ctxc`?
#
# -[ ] a self-cancelled context, if not allowed to block on
# `ctx.result()` at some point will hang since the `ctx._scope`
# is never `.cancel_called`; cases for this include,
# - an `open_ctx()` which never starteds before being OoB actor
# cancelled.
# |_ parent task will be blocked in `.open_context()` for the
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
# will never have been signalled..
# '''
# ...
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
# but with the `Lock.acquire()` from a `@context` to ensure the
# implicit ignore-case-non-unmasking.
#
# @tractor.context
# async def acquire_actor_global_lock(
# ctx: tractor.Context,
# ignore_special_cases: bool,
# ):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
# # block til cancelled
# await trio.sleep_forever()
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
# ignore_special_cases: bool,
do_started: bool,
):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
if do_started:
await ctx.started()
# block til cancelled
print('sleepin on child-side..')
await trio.sleep_forever()
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
)
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
debug_mode: bool,
loglevel: str,
cancel_ctx: bool,
):
'''
The most "basic" out-of-band-task self-cancellation case where
`Portal.open_context()` is entered in a bg task and the
parent-task (of the containing nursery) calls `Context.cancel()`
without the child knowing; the `Context._scope` should be
`.cancel_called` when the IPC ctx's child-side relays
a `ContextCancelled` with a `.canceller` set to the parent
actor('s task).
'''
async def main():
with trio.fail_after(
2 if not debug_mode else 999,
):
an: ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
loglevel='devx',
enable_stack_on_sig=True,
) as an,
trio.open_nursery() as tn,
):
ptl: Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
async def _open_ctx_async(
do_started: bool = True,
task_status=trio.TASK_STATUS_IGNORED,
):
# do we expect to never enter the
# `.open_context()` below.
if not do_started:
task_status.started()
async with ptl.open_context(
sleep_forever,
do_started=do_started,
) as (ctx, first):
task_status.started(ctx)
await trio.sleep_forever()
# XXX, this is the key OoB part!
#
# - start the `.open_context()` in a bg task which
# blocks inside the embedded scope-body,
#
# - when we call `Context.cancel()` it **is
# not** from the same task which eventually runs
# `.__aexit__()`,
#
# - since the bg "opener" task will be in
# a `trio.sleep_forever()`, it must be interrupted
# by the `ContextCancelled` delivered from the
# child-side; `Context._scope: CancelScope` MUST
# be `.cancel_called`!
#
print('ASYNC opening IPC context in subtask..')
maybe_ctx: Context|None = await tn.start(partial(
_open_ctx_async,
))
if (
maybe_ctx
and
cancel_ctx
):
print('cancelling first IPC ctx!')
await maybe_ctx.cancel()
# XXX, note that despite `maybe_context.cancel()`
# being called above, it's the parent (bg) task
# which was originally never interrupted in
# the `ctx._scope` body due to missing case logic in
# `ctx._maybe_cancel_and_set_remote_error()`.
#
# It didn't matter that the subactor process was
# already terminated and reaped, nothing was
# cancelling the ctx-parent task's scope!
#
print('cancelling subactor!')
await ptl.cancel_actor()
if maybe_ctx:
try:
await maybe_ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert not cancel_ctx
assert (
ctxc.canceller
==
tractor.current_actor().aid.uid
)
# don't re-raise since it'll trigger
# an EG from the above tn.
if cancel_ctx:
# graceful self-cancel
trio.run(main)
else:
# ctx parent task should see OoB ctxc due to
# `ptl.cancel_actor()`.
with pytest.raises(tractor.ContextCancelled) as excinfo:
trio.run(main)
assert 'root' in excinfo.value.canceller[0]
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
# debug_mode: bool,
# loglevel: str,
# ):
# '''
# Demos OoB cancellation from the perspective of a ctx opened with
# a child subactor where the parent cancels the child at the "actor
# layer" using `Portal.cancel_actor()` and thus the
# `ContextCancelled.canceller` received by the ctx's parent-side
# task will appear to be a "self cancellation" even though that
# specific task itself was not cancelled and thus
# `Context.cancel_called ==False`.
# '''
# TODO, do we have an existing implied ctx
# cancel test like this?
# with trio.move_on_after(0.5):# as cs:
# await _open_ctx_async(
# do_started=False,
# )
# in-line ctx scope should definitely raise
# a ctxc with `.canceller = 'root'`
# async with ptl.open_context(
# sleep_forever,
# do_started=True,
# ) as pair:

View File

@ -6,11 +6,18 @@ want to see changed.
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from functools import partial
import pytest
from _pytest import pathlib
from tractor.trionics import collapse_eg
import trio
from trio import TaskStatus
from tractor._testing import (
examples_dir,
)
@pytest.mark.parametrize(
@ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
debug_mode: bool,
):
'''
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
`.__context__` field when a user (by accident) re-raises from a `finally:`.
Demo how a masking `trio.Cancelled` could be handled by unmasking
from the `.__context__` field when a user (by accident) re-raises
from a `finally:`.
'''
import tractor
@ -117,11 +125,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
async with (
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc(
tn=tn,
unmask_from=(
trio.Cancelled
if unmask_from_canc
else None
(trio.Cancelled,) if unmask_from_canc
else ()
),
)
):
@ -136,8 +142,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
if bxerr:
assert not bxerr.value
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,
@ -145,11 +150,12 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert not tn.cancel_scope.cancel_called
assert 0
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
if debug_mode:
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
with pytest.raises(ExceptionGroup) as excinfo:
trio.run(_main)
@ -160,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert len(assert_eg.exceptions) == 1
def test_gatherctxs_with_memchan_breaks_multicancelled(
debug_mode: bool,
):
'''
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
will break a strict-eg-tn's multi-cancelled absorption..
Demo how a using an `async with sndchan` inside
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
multi-cancelled absorption..
'''
from tractor import (
@ -192,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
f'Closed {task!r}\n'
)
async def main():
async with (
# XXX should ensure ONLY the KBI
@ -213,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
with pytest.raises(KeyboardInterrupt):
trio.run(main)
@pytest.mark.parametrize(
'raise_unmasked', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/send_chan_aclose_masks.py"
)
),
]
)
@pytest.mark.parametrize(
'child_errors_mid_stream',
[True, False],
)
def test_unmask_aclose_as_checkpoint_on_aexit(
raise_unmasked: bool,
child_errors_mid_stream: bool,
debug_mode: bool,
):
'''
Verify that our unmasker util works over the common case where
a mem-chan's `.aclose()` is included in an `@acm` stack
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
exception from user code resulting in a silent failure which
appears like graceful cancellation.
This test suite is mostly implemented as an example script so it
could more easily be shared with `trio`-core peeps as `tractor`-less
minimum reproducing example.
'''
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'send_chan_aclose_masks_beg.py',
root=examples_dir(),
consider_namespace_packages=False,
)
with pytest.raises(RuntimeError):
trio.run(partial(
mod.main,
raise_unmasked=raise_unmasked,
child_errors_mid_stream=child_errors_mid_stream,
))
@pytest.mark.parametrize(
'ignore_special_cases', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/lockacquire_not_umasked.py"
)
),
]
)
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
ignore_special_cases: bool,
loglevel: str,
debug_mode: bool,
):
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'lockacquire_not_unmasked.py',
root=examples_dir(),
consider_namespace_packages=False,
)
async def _main():
with trio.fail_after(2):
await mod.main(
ignore_special_cases=ignore_special_cases,
loglevel=loglevel,
debug_mode=debug_mode,
)
trio.run(_main)

View File

@ -442,25 +442,25 @@ class Context:
'''
Records whether cancellation has been requested for this context
by a call to `.cancel()` either due to,
- either an explicit call by some local task,
- an explicit call by some local task,
- or an implicit call due to an error caught inside
the ``Portal.open_context()`` block.
the `Portal.open_context()` block.
'''
return self._cancel_called
@cancel_called.setter
def cancel_called(self, val: bool) -> None:
'''
Set the self-cancelled request `bool` value.
# XXX, to debug who frickin sets it..
# @cancel_called.setter
# def cancel_called(self, val: bool) -> None:
# '''
# Set the self-cancelled request `bool` value.
'''
# to debug who frickin sets it..
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
# '''
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
self._cancel_called = val
# self._cancel_called = val
@property
def canceller(self) -> tuple[str, str]|None:
@ -635,6 +635,71 @@ class Context:
'''
await self.chan.send(Stop(cid=self.cid))
@property
def parent_task(self) -> trio.Task:
'''
This IPC context's "owning task" which is a `trio.Task`
on one of the "sides" of the IPC.
Note that the "parent_" prefix here refers to the local
`trio` task tree using the same interface as
`trio.Nursery.parent_task` whereas for IPC contexts,
a different cross-actor task hierarchy exists:
- a "parent"-side which originally entered
`Portal.open_context()`,
- the "child"-side which was spawned and scheduled to invoke
a function decorated with `@tractor.context`.
This task is thus a handle to mem-domain-distinct/per-process
`Nursery.parent_task` depending on in which of the above
"sides" this context exists.
'''
return self._task
def _is_blocked_on_rx_chan(self) -> bool:
'''
Predicate to indicate whether the owner `._task: trio.Task` is
currently blocked (by `.receive()`-ing) on its underlying RPC
feeder `._rx_chan`.
This knowledge is highly useful when handling so called
"out-of-band" (OoB) cancellation conditions where a peer
actor's task transmitted some remote error/cancel-msg and we
must know whether to signal-via-cancel currently executing
"user-code" (user defined code embedded in `ctx._scope`) or
simply to forward the IPC-msg-as-error **without calling**
`._scope.cancel()`.
In the latter case it is presumed that if the owner task is
blocking for the next IPC msg, it will eventually receive,
process and raise the equivalent local error **without**
requiring `._scope.cancel()` to be explicitly called by the
*delivering OoB RPC-task* (via `_deliver_msg()`).
'''
# NOTE, see the mem-chan meth-impls for *why* this
# logic works,
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
#
# XXX realize that this is NOT an
# official/will-be-loudly-deprecated API:
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
#
# orig repo intro in the mem-chan change over patch:
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
# |_https://github.com/python-trio/trio/pull/616
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
#
return (
self._task.custom_sleep_data
is
self._rx_chan
)
def _maybe_cancel_and_set_remote_error(
self,
error: BaseException,
@ -787,13 +852,27 @@ class Context:
if self._canceller is None:
log.error('Ctx has no canceller set!?')
cs: trio.CancelScope = self._scope
# ?TODO? see comment @ .start_remote_task()`
#
# if not cs:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise RuntimeError(
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
# f'{self}\n'
# f'\n'
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
# )
# Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the
# `.open_context()` block.
cs: trio.CancelScope = self._scope
if (
cs
and not cs.cancel_called
# XXX this is an expected cancel request response
# message and we **don't need to raise it** in the
@ -802,8 +881,7 @@ class Context:
# if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set.
and not self._is_self_cancelled()
and not cs.cancel_called
and not cs.cancelled_caught
# and not cs.cancelled_caught
):
if (
msgerr
@ -814,7 +892,7 @@ class Context:
not self._cancel_on_msgerr
):
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
@ -824,13 +902,43 @@ class Context:
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
# from .devx import pause_from_sync
# pause_from_sync()
self._scope.cancel()
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
cs.cancel()
# TODO, explicit condition for OoB (self-)cancellation?
# - we called `Portal.cancel_actor()` from this actor
# and the peer ctx task delivered ctxc due to it.
# - currently `self._is_self_cancelled()` will be true
# since the ctxc.canceller check will match us even though it
# wasn't from this ctx specifically!
elif (
cs
and self._is_self_cancelled()
and not cs.cancel_called
):
message: str = (
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
'\n'
)
# from .devx import mk_pdb
# mk_pdb().set_trace()
# TODO XXX, required to fix timeout failure in
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
#
# XXX NOTE XXX, this is SUPER SUBTLE!
# we only want to cancel our embedded `._scope`
# if the ctx's current/using task is NOT blocked
# on `._rx_chan.receive()` and on some other
# `trio`-checkpoint since in the former case
# any `._remote_error` will be relayed through
# the rx-chan and appropriately raised by the owning
# `._task` directly. IF the owner task is however
# blocking elsewhere we need to interrupt it **now**.
if not self._is_blocked_on_rx_chan():
cs.cancel()
else:
# rx_stats = self._rx_chan.statistics()
message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if (
@ -854,6 +962,7 @@ class Context:
+
cs_fmt
)
log.cancel(
message
+
@ -946,8 +1055,9 @@ class Context:
'''
side: str = self.side
# XXX for debug via the `@.setter`
self.cancel_called = True
self._cancel_called = True
# ^ XXX for debug via the `@.setter`
# self.cancel_called = True
header: str = (
f'Cancelling ctx from {side!r}-side\n'
@ -2011,6 +2121,9 @@ async def open_context_from_portal(
f'|_{portal.actor}\n'
)
# ?TODO? could we move this to inside the `tn` block?
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
# -> would allow a `if not ._scope: => raise RTE` ?
ctx: Context = await portal.actor.start_remote_task(
portal.channel,
nsf=nsf,
@ -2037,6 +2150,7 @@ async def open_context_from_portal(
scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None
try:
# from .devx import pause
async with (
collapse_eg(),
trio.open_nursery() as tn,
@ -2059,6 +2173,10 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
try:
log.runtime(
f'IPC ctx parent waiting on Started msg..\n'
f'ctx.cid: {ctx.cid!r}\n'
)
started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx,
expect_msg=Started,
@ -2067,16 +2185,16 @@ async def open_context_from_portal(
)
except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope
log.cancel(
f'IPC ctx was cancelled during "child" task sync due to\n\n'
f'.cid: {ctx.cid!r}\n'
f'.maybe_error: {ctx.maybe_error!r}\n'
)
# await pause(shield=True)
if not ctx_cs.cancel_called:
raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure
@ -2199,7 +2317,7 @@ async def open_context_from_portal(
# documenting it as a definittive example of
# debugging the tractor-runtime itself using it's
# own `.devx.` tooling!
#
#
# await debug.pause()
# CASE 2: context was cancelled by local task calling
@ -2272,13 +2390,16 @@ async def open_context_from_portal(
match scope_err:
case trio.Cancelled():
logmeth = log.cancel
cause: str = 'cancelled'
# XXX explicitly report on any non-graceful-taskc cases
case _:
cause: str = 'errored'
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
if debug_mode():
@ -2303,6 +2424,7 @@ async def open_context_from_portal(
# told us it's cancelled ;p
if ctxc_from_child is None:
try:
# await pause(shield=True)
await ctx.cancel()
except (
trio.BrokenResourceError,
@ -2459,8 +2581,10 @@ async def open_context_from_portal(
log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n'
f'c)>\n'
f' |_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
f' |_{ctx.parent_task}\n'
f' .cid={ctx.cid!r}\n'
f'\n'
f'{scope_err!r}\n'
)
# TODO: should we add a `._cancel_req_received`

View File

@ -654,8 +654,7 @@ async def _invoke(
# scope ensures unasking of the `await coro` below
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=Cancelled,
unmask_from=(Cancelled,),
) as _mbme, # maybe boxed masked exc
):
ctx._scope_nursery = tn

View File

@ -446,12 +446,12 @@ class ActorNursery:
@acm
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
tb_hide: bool = False,
hide_tb: bool = True,
) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default
__tracebackhide__: bool = tb_hide
__tracebackhide__: bool = hide_tb
outer_err: BaseException|None = None
inner_err: BaseException|None = None

View File

@ -0,0 +1,26 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level design patterns, APIs and runtime extensions built on top
of the `tractor` runtime core.
'''
from ._service import (
open_service_mngr as open_service_mngr,
get_service_mngr as get_service_mngr,
ServiceMngr as ServiceMngr,
)

View File

@ -0,0 +1,592 @@
# tractor: structured concurrent "actors".
# Copyright 2024-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Daemon subactor as service(s) management and supervision primitives
and API.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import tractor
import trio
from trio import TaskStatus
from tractor import (
log,
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
)
log = log.get_logger('tractor')
# TODO: implement a `@singleton` deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# -[ ] go through the options peeps on SO did?
# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
# * including @mikenerone's answer
# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
#
# -[ ] put it in `tractor.lowlevel._globals` ?
# * fits with our oustanding actor-local/global feat req?
# |_ https://github.com/goodboy/tractor/issues/55
# * how can it relate to the `Actor.lifetime_stack` that was
# silently patched in?
# |_ we could implicitly call both of these in the same
# spot in the runtime using the lifetime stack?
# - `open_singleton_cm().__exit__()`
# -`del_singleton()`
# |_ gives SC fixtue semantics to sync code oriented around
# sub-process lifetime?
# * what about with `trio.RunVar`?
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
# - which we'll need for no-GIL cpython (right?) presuming
# multiple `trio.run()` calls in process?
#
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# a deletion API for explicit instance de-allocation?
# @open_service_mngr.deleter
# def del_service_mngr() -> None:
# mngr = open_service_mngr._singleton[0]
# open_service_mngr._singleton[0] = None
# del mngr
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# NOTE; since default values for keyword-args are effectively
# module-vars/globals as per the note from,
# https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
#
# > "The default value is evaluated only once. This makes
# a difference when the default is a mutable object such as
# a list, dictionary, or instances of most classes"
#
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open an actor-global "service-manager" for supervising a tree
of subactors and/or actor-global tasks.
The delivered `ServiceMngr` is singleton instance for each
actor-process, that is, allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
else:
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_ctxs:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
async def _open_and_supervise_service_ctx(
serman: ServiceMngr,
name: str,
ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
portal: Portal,
allow_overruns: bool = False,
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
**ctx_kwargs,
) -> Any:
'''
Open a remote IPC-context defined by `ctx_fn` in the
(service) actor accessed via `portal` and supervise the
(local) parent task to termination at which point the remote
actor runtime is cancelled alongside it.
The main application is for allocating long-running
"sub-services" in a main daemon and explicitly controlling
their lifetimes from an actor-global singleton.
'''
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
ctx_fn,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() # terminate (remote) sub-actor
complete.set() # signal caller this task is done
serman.service_ctxs.pop(name) # remove mngr entry
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
trio.Event,
]
] = field(default_factory=dict)
service_ctxs: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
# TODO, unify this interface with our `TaskManager` PR!
#
#
async def start_service_task(
self,
name: str,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
fn: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Any,
trio.Event,
]:
async def _task_manager_start(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
task_cs = trio.CancelScope()
task_complete = trio.Event()
with task_cs as cs:
task_status.started((
cs,
task_complete,
))
try:
await fn()
except trio.Cancelled as taskc:
log.cancel(
f'Service task for `{name}` was cancelled!\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
)
raise taskc
finally:
task_complete.set()
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (
cs,
complete,
)
return (
cs,
complete,
)
async def cancel_service_task(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
# TODO, if we use the `TaskMngr` from #346
# we can also get the return value from the task!
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service task {name!r} not terminated!?\n'
)
async def start_service_ctx(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
ctx_fn: Callable,
**ctx_kwargs,
) -> tuple[
trio.CancelScope,
Context,
Any,
]:
'''
Start a remote IPC-context defined by `ctx_fn` in a background
task and immediately return supervision primitives to manage it:
- a `cs: CancelScope` for the newly allocated bg task
- the `ipc_ctx: Context` to manage the remotely scheduled
`trio.Task`.
- the `started: Any` value returned by the remote endpoint
task's `Context.started(<value>)` call.
The bg task supervises the ctx such that when it terminates the supporting
actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
name=name,
ctx_fn=ctx_fn,
portal=portal,
**ctx_kwargs,
)
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
return (
cs,
ipc_ctx,
started,
)
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
# ^TODO, type for `@tractor.context` deco-ed funcs!
debug_mode: bool = False,
**start_actor_kwargs,
) -> Context:
'''
Start new subactor and schedule a supervising "service task"
in it which explicitly defines the sub's lifetime.
"Service daemon subactors" are cancelled (and thus
terminated) using the paired `.cancel_service()`.
Effectively this API can be used to manage "service daemons"
spawned under a single parent actor with supervision
semantics equivalent to a one-cancels-one style actor-nursery
or "(subactor) task manager" where each subprocess's (and
thus its embedded actor runtime) lifetime is synced to that
of the remotely spawned task defined by `ctx_ep`.
The funcionality can be likened to a "daemonized" version of
`.hilevel.worker.run_in_actor()` but with supervision
controls offered by `tractor.Context` where the main/root
remotely scheduled `trio.Task` invoking `ctx_ep` determines
the underlying subactor's lifetime.
'''
entry: tuple|None = self.service_ctxs.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**start_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(
cs,
sub_ctx,
started,
) = await self.start_service_ctx(
name=daemon_name,
portal=portal,
ctx_fn=ctx_ep,
**ctx_kwargs,
)
return sub_ctx
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_ctxs[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_ctxs:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Service actor for {name} not terminated and/or unknown?'
)
# assert name not in self.service_ctxs, \
# f'Serice task for {name} not terminated?'

View File

@ -613,10 +613,9 @@ async def drain_to_final_msg(
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
#
# -[ ] make sure pause points work here for REPLing
# -[x] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs!
# |_from tractor.devx.debug import pause
# await pause()
# |_see masked code below in .cancel_called path
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
@ -652,6 +651,10 @@ async def drain_to_final_msg(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# XXX, for tracing `Cancelled`..
# from tractor.devx.debug import pause
# await pause(shield=True)
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's

View File

@ -21,7 +21,6 @@ Sugary patterns for trio + tractor designs.
from ._mngrs import (
gather_contexts as gather_contexts,
maybe_open_context as maybe_open_context,
maybe_open_nursery as maybe_open_nursery,
)
from ._broadcast import (
AsyncReceiver as AsyncReceiver,
@ -37,3 +36,6 @@ from ._beg import (
from ._taskc import (
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
)
from ._tn import (
maybe_open_nursery as maybe_open_nursery,
)

View File

@ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html
from __future__ import annotations
from abc import abstractmethod
from collections import deque
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from functools import partial
from operator import ne
from typing import (
@ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel):
return await self._receive_from_underlying(key, state)
@asynccontextmanager
@acm
async def subscribe(
self,
raise_on_lag: bool = True,

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager as acm,
)
import inspect
from types import ModuleType
from typing import (
Any,
AsyncContextManager,
@ -31,24 +30,20 @@ from typing import (
AsyncIterator,
Callable,
Hashable,
Optional,
Sequence,
TypeVar,
TYPE_CHECKING,
)
import trio
from tractor._state import current_actor
from tractor.log import get_logger
from ._tn import maybe_open_nursery
# from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
@ -56,30 +51,6 @@ log = get_logger(__name__)
T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as nursery:
if lib == trio:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
@ -204,7 +175,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_tn: Optional[trio.Nursery] = None
service_tn: trio.Nursery|None = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -213,7 +184,7 @@ class _Cache:
tuple[trio.Nursery, trio.Event]
] = {}
# nurseries: dict[int, trio.Nursery] = {}
no_more_users: Optional[trio.Event] = None
no_more_users: trio.Event|None = None
@classmethod
async def run_ctx(
@ -223,16 +194,18 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
cls.values[ctx_key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key)
try:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value
task_status.started(value)
await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
cls.resources.pop(ctx_key)
@acm

View File

@ -0,0 +1,341 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Erlang-style (ish) "one-cancels-one" nursery, what we just call
a "task manager".
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from functools import partial
from typing import (
Generator,
Any,
)
from outcome import (
Outcome,
acapture,
)
from msgspec import Struct
import trio
from trio import (
TaskStatus,
CancelScope,
Nursery,
)
from trio.lowlevel import (
Task,
)
from tractor.log import get_logger
log = get_logger(__name__)
class TaskOutcome(Struct):
'''
The outcome of a scheduled ``trio`` task which includes an interface
for synchronizing to the completion of the task's runtime and access
to the eventual boxed result/value or raised exception.
'''
lowlevel_task: Task
_exited = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value
@property
def result(self) -> Any:
'''
Either Any or None depending on whether the Outcome has compeleted.
'''
if self._outcome is None:
raise RuntimeError(
f'Task {self.lowlevel_task.name} is not complete.\n'
'First wait on `await TaskOutcome.wait_for_result()`!'
)
return self._result
def _set_outcome(
self,
outcome: Outcome,
):
'''
Set the ``Outcome`` for this task.
This method should only ever be called by the task's supervising
nursery implemenation.
'''
self._outcome = outcome
self._result = outcome.unwrap()
self._exited.set()
async def wait_for_result(self) -> Any:
'''
Unwind the underlying task's ``Outcome`` by async waiting for
the task to first complete and then unwrap it's result-value.
'''
if self._exited.is_set():
return self._result
await self._exited.wait()
out = self._outcome
if out is None:
raise ValueError(f'{out} is not an outcome!?')
return self.result
class TaskManagerNursery(Struct):
_tn: Nursery
_scopes: dict[
Task,
tuple[CancelScope, Outcome]
] = {}
task_manager: Generator[Any, Outcome, None] | None = None
async def start_soon(
self,
async_fn,
*args,
name=None,
task_manager: Generator[Any, Outcome, None] | None = None
) -> tuple[CancelScope, Task]:
# NOTE: internals of a nursery don't let you know what
# the most recently spawned task is by order.. so we'd
# have to either change that or do set ops.
# pre_start_tasks: set[Task] = n._children.copy()
# new_tasks = n._children - pre_start_Tasks
# assert len(new_tasks) == 1
# task = new_tasks.pop()
tn: Nursery = self._tn
sm = self.task_manager
# we do default behavior of a scope-per-nursery
# if the user did not provide a task manager.
if sm is None:
return tn.start_soon(async_fn, *args, name=None)
# new_task: Task|None = None
to_return: tuple[Any] | None = None
# NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here?
mngr = sm(nursery=tn)
async def _start_wrapped_in_scope(
task_status: TaskStatus[
tuple[CancelScope, Task]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this was working before?! and, do we need something
# like it to implement `.start()`?
# nonlocal to_return
# execute up to the first yield
try:
to_return: tuple[Any] = next(mngr)
except StopIteration:
raise RuntimeError("task manager didn't yield") from None
# TODO: how do we support `.start()` style?
# - relay through whatever the
# started task passes back via `.started()` ?
# seems like that won't work with also returning
# a "task handle"?
# - we were previously binding-out this `to_return` to
# the parent's lexical scope, why isn't that working
# now?
task_status.started(to_return)
# invoke underlying func now that cs is entered.
outcome = await acapture(async_fn, *args)
# execute from the 1st yield to return and expect
# generator-mngr `@task_scope_manager` thinger to
# terminate!
try:
mngr.send(outcome)
# I would presume it's better to have a handle to
# the `Outcome` entirely? This method sends *into*
# the mngr this `Outcome.value`; seems like kinda
# weird semantics for our purposes?
# outcome.send(mngr)
except StopIteration:
return
else:
raise RuntimeError(f"{mngr} didn't stop!")
to_return = await tn.start(_start_wrapped_in_scope)
assert to_return is not None
# TODO: use the fancy type-check-time type signature stuff from
# mypy i guess..to like, relay the type of whatever the
# generator yielded through? betcha that'll be un-grokable XD
return to_return
# TODO: define a decorator to runtime type check that this a generator
# with a single yield that also delivers a value (of some std type) from
# the yield expression?
# @trio.task_manager
def add_task_handle_and_crash_handling(
nursery: Nursery,
debug_mode: bool = False,
) -> Generator[
Any,
Outcome,
None,
]:
'''
A customizable, user defined "task scope manager".
With this specially crafted single-yield generator function you can
add more granular controls around every task spawned by `trio` B)
'''
# if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task()
log.info(f'Spawning task: {task.name}')
# User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
task_outcome = TaskOutcome(task)
# NOTE: if wanted the user could wrap the output task handle however
# they want!
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome
# this yields back when the task is terminated, cancelled or returns.
try:
with CancelScope() as cs:
# the yielded value(s) here are what are returned to the
# nursery's `.start_soon()` caller B)
lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome)
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err:
if debug_mode:
log.exception(
f'{task.name} crashed, entering debugger!'
)
import pdbp
pdbp.xpm()
raise err
finally:
log.info(
f'Task exitted\n'
f')>\n'
f' |_{task}\n'
# ^^TODO? use sclang formatter?
# -[ ] .devx.pformat.nest_from_op()` yo!
)
@acm
async def open_taskman(
task_manager: Generator[Any, Outcome, None] | None = None,
**lowlevel_nursery_kwargs,
):
async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse:
yield TaskManagerNursery(
nurse,
task_manager=task_manager,
)
async def sleep_then_return_val(val: str):
await trio.sleep(0.2)
return val
async def ensure_cancelled():
try:
await trio.sleep_forever()
except trio.Cancelled:
task = trio.lowlevel.current_task()
log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)')
assert 0
except BaseException:
raise RuntimeError("woa woa woa this ain't right!")
if __name__ == '__main__':
from tractor.log import get_console_log
get_console_log(level='info')
async def main():
async with open_taskman(
task_manager=partial(
add_task_handle_and_crash_handling,
debug_mode=True,
),
) as tm:
for _ in range(3):
outcome, _ = await tm.start_soon(trio.sleep_forever)
# extra task we want to engage in debugger post mortem.
err_outcome, cs = await tm.start_soon(ensure_cancelled)
val: str = 'yoyoyo'
val_outcome, _ = await tm.start_soon(
sleep_then_return_val,
val,
)
res = await val_outcome.wait_for_result()
assert res == val
log.info(f'{res} -> GOT EXPECTED TASK VALUE')
await trio.sleep(0.6)
log.cancel(
f'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)
cs.cancel()
trio.run(main)

View File

@ -22,7 +22,14 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import TYPE_CHECKING
import inspect
from types import (
TracebackType,
)
from typing import (
Type,
TYPE_CHECKING,
)
import trio
from tractor.log import get_logger
@ -60,12 +67,71 @@ def find_masked_excs(
return None
_mask_cases: dict[
Type[Exception], # masked exc type
dict[
int, # inner-frame index into `inspect.getinnerframes()`
# `FrameInfo.function/filename: str`s to match
dict[str, str],
],
] = {
trio.WouldBlock: {
# `trio.Lock.acquire()` has a checkpoint inside the
# `WouldBlock`-no_wait path's handler..
-5: { # "5th frame up" from checkpoint
'filename': 'trio/_sync.py',
'function': 'acquire',
# 'lineno': 605, # matters?
},
}
}
def is_expected_masking_case(
cases: dict,
exc_ctx: Exception,
exc_match: BaseException,
) -> bool|inspect.FrameInfo:
'''
Determine whether the provided masked exception is from a known
bug/special/unintentional-`trio`-impl case which we do not wish
to unmask.
Return any guilty `inspect.FrameInfo` ow `False`.
'''
exc_tb: TracebackType = exc_match.__traceback__
if cases := _mask_cases.get(type(exc_ctx)):
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
for iframe, matchon in cases.items():
try:
masker_frame: inspect.FrameInfo = inner[iframe]
except IndexError:
continue
for field, in_field in matchon.items():
val = getattr(
masker_frame,
field,
)
if in_field not in val:
break
else:
return masker_frame
return False
# XXX, relevant discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455
#
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: (
BaseException|
tuple[BaseException]
@ -74,18 +140,30 @@ async def maybe_raise_from_masking_exc(
raise_unmasked: bool = True,
extra_note: str = (
'This can occurr when,\n'
' - a `trio.Nursery` scope embeds a `finally:`-block '
'which executes a checkpoint!'
'\n'
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
'which execs an un-shielded checkpoint!'
#
# ^TODO? other cases?
),
always_warn_on: tuple[BaseException] = (
always_warn_on: tuple[Type[BaseException]] = (
trio.Cancelled,
),
# don't ever unmask or warn on any masking pair,
# {<masked-excT-key> -> <masking-excT-value>}
never_warn_on: dict[
Type[BaseException],
Type[BaseException],
] = {
KeyboardInterrupt: trio.Cancelled,
trio.Cancelled: trio.Cancelled,
},
# ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc
# is always expected to be consumed.
) -> BoxedMaybeException:
'''
Maybe un-mask and re-raise exception(s) suppressed by a known
@ -104,81 +182,112 @@ async def maybe_raise_from_masking_exc(
individual sub-excs but maintain the eg-parent's form right?
'''
if not isinstance(unmask_from, tuple):
raise ValueError(
f'Invalid unmask_from = {unmask_from!r}\n'
f'Must be a `tuple[Type[BaseException]]`.\n'
)
from tractor.devx.debug import (
BoxedMaybeException,
pause,
)
boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked,
)
matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs
yield boxed_maybe_exc
return
except* unmask_from as _maybe_eg:
maybe_eg = _maybe_eg
try:
yield boxed_maybe_exc
return
except BaseException as _bexc:
bexc = _bexc
if isinstance(bexc, BaseExceptionGroup):
matches: ExceptionGroup
matches, _ = maybe_eg.split(
unmask_from
)
if not matches:
raise
matches, _ = bexc.split(unmask_from)
if matches:
matching = matches.exceptions
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
elif (
unmask_from
and
type(bexc) in unmask_from
):
matching = [bexc]
if matching is None:
raise
masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching:
if exc_ctx := find_masked_excs(
maybe_masker=exc_match,
unmask_from={unmask_from},
unmask_from=set(unmask_from),
):
masked.append((exc_ctx, exc_match))
masked.append((
exc_ctx,
exc_match,
))
boxed_maybe_exc.value = exc_match
note: str = (
f'\n'
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
f'^^WARNING^^\n'
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
)
if extra_note:
note += (
f'\n'
f'{extra_note}\n'
)
exc_ctx.add_note(note)
if type(exc_match) in always_warn_on:
do_warn: bool = (
never_warn_on.get(
type(exc_ctx) # masking type
)
is not
type(exc_match) # masked type
)
if do_warn:
exc_ctx.add_note(note)
if (
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note)
# await tractor.pause(shield=True)
if raise_unmasked:
if (
do_warn
and
raise_unmasked
):
if len(masked) < 2:
# don't unmask already known "special" cases..
if (
_mask_cases
and
(cases := _mask_cases.get(type(exc_ctx)))
and
(masker_frame := is_expected_masking_case(
cases,
exc_ctx,
exc_match,
))
):
log.warning(
f'Ignoring already-known, non-ideal-but-valid '
f'masker code @\n'
f'{masker_frame}\n'
f'\n'
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
)
raise exc_match
raise exc_ctx from exc_match
else:
# ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
await pause(shield=True)
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else:
raise

View File

@ -0,0 +1,94 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`trio.Nursery` wrappers which we short-hand refer to as
`tn`: "task nursery".
(whereas we refer to `tractor.ActorNursery` as the short-hand `an`)
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from typing import (
Any,
AsyncGenerator,
TYPE_CHECKING,
)
import trio
from tractor.log import get_logger
# from ._beg import (
# collapse_eg,
# )
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
# ??TODO? is this even a good idea??
# it's an extra LoC to stack `collapse_eg()` vs.
# a new/foreign/bad-std-named very thing wrapper..?
# -[ ] is there a better/simpler name?
# @acm
# async def open_loose_tn() -> trio.Nursery:
# '''
# Implements the equivalent of the old style loose eg raising
# task-nursery from `trio<=0.25.0` ,
# .. code-block:: python
# async with trio.open_nursery(
# strict_exception_groups=False,
# ) as tn:
# ...
# '''
# async with (
# collapse_eg(),
# trio.open_nursery() as tn,
# ):
# yield tn
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
shield: bool = False,
lib: ModuleType = trio,
loose: bool = False,
**kwargs, # proxy thru
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with lib.open_nursery(**kwargs) as tn:
tn.cancel_scope.shield = shield
yield tn