Compare commits

..

14 Commits

Author SHA1 Message Date
Tyler Goodlet 23809b8468 Bump "task-manager(-nursery)" naming, add logging
Namely just renaming any `trio.Nursery` instances to `tn`, the primary
`@acm`-API to `.trionics.open_taskman()` and change out all `print()`s
for logger instances with 'info' level enabled by the mod-script usage.
2025-08-20 13:08:39 -04:00
Tyler Goodlet 60427329ee Add a new `.trionics._tn` for "task nursery stuff"
Since I'd like to decouple the new "task-manager-nursery" lowlevel
primitives/abstractions from the higher-level
`TaskManagerNursery`-supporting API(s) and default per-task
supervision-strat and because `._mngr` is already purposed for
higher-level "on-top-of-nursery" patterns as it is.

Deats,
- move `maybe_open_nursery()` into the new mod.
- adjust the pkg-mod's import to the new sub-mod.
- also draft up this idea for an API which stacks `._beg.collapse_eg()`
  onto a nursery with the WIP name `open_loose_tn()` but more then
  likely i'll just discard this idea bc i think the explicit `@acm`
  stacking is more explicit/pythonic/up-front-grokable despite the extra
  LoC.
2025-08-20 13:08:37 -04:00
Tyler Goodlet f946041d44 Add `debug_mode: bool` control to task mngr
Allows dynamically importing `pdbp` when enabled and a way for
eventually linking with `tractor`'s own debug mode flag.
2025-08-20 13:06:43 -04:00
Tyler Goodlet a4339d6ac6 Go all in on "task manager" naming 2025-08-20 13:06:43 -04:00
Tyler Goodlet 9123fbdbfa More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
2025-08-20 13:06:43 -04:00
Tyler Goodlet e7b3254b7b Ensure user-allocated cancel scope just works!
Turns out the nursery doesn't have to care about allocating a per task
`CancelScope` since the user can just do that in the
`@task_scope_manager` if desired B) So just mask all the nursery cs
allocating with the intention of removal.

Also add a test for per-task-cancellation by starting the crash task as
a `trio.sleep_forever()` but then cancel it via the user allocated cs
and ensure the crash propagates as expected 💥
2025-08-20 13:06:43 -04:00
Tyler Goodlet e468f62c26 Facepalm, don't pass in unecessary cancel scope 2025-08-20 13:06:43 -04:00
Tyler Goodlet 6c65729c20 Do renaming, implement lowlevel `Outcome` sending
As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
2025-08-20 13:06:43 -04:00
Tyler Goodlet 94fbbe0b05 Alias to `@acm` in broadcaster mod 2025-08-20 13:06:43 -04:00
Tyler Goodlet d5b54f3f5e Initial prototype for a one-cancels-one style supervisor, nursery thing.. 2025-08-20 13:06:43 -04:00
Tyler Goodlet fd314deecb Use shorthand nursery var-names per convention in codebase 2025-08-20 13:06:10 -04:00
Tyler Goodlet dd011c0b2f Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2025-08-20 13:06:10 -04:00
Tyler Goodlet 087aaa1c36 Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2025-08-20 13:06:10 -04:00
Tyler Goodlet 66b7410eab Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2025-08-20 13:06:10 -04:00
12 changed files with 160 additions and 1008 deletions

View File

@ -1,85 +0,0 @@
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
_lock: trio.Lock|None = None
@acm
async def acquire_singleton_lock(
) -> None:
global _lock
if _lock is None:
log.info('Allocating LOCK')
_lock = trio.Lock()
log.info('TRYING TO LOCK ACQUIRE')
async with _lock:
log.info('ACQUIRED')
yield _lock
log.info('RELEASED')
async def hold_lock_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
acquire_singleton_lock() as lock,
):
task_status.started(lock)
await trio.sleep_forever()
async def main(
ignore_special_cases: bool,
loglevel: str = 'info',
debug_mode: bool = True,
):
async with (
trio.open_nursery() as tn,
# tractor.trionics.maybe_raise_from_masking_exc()
# ^^^ XXX NOTE, interestingly putting the unmasker
# here does not exhibit the same behaviour ??
):
if not ignore_special_cases:
from tractor.trionics import _taskc
_taskc._mask_cases.clear()
_lock = await tn.start(
hold_lock_forever,
)
with trio.move_on_after(0.2):
await tn.start(
hold_lock_forever,
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'ignore_special_cases: {case!r}\n'
)
trio.run(partial(
main,
ignore_special_cases=case,
loglevel='info',
))

View File

@ -1,195 +0,0 @@
from contextlib import (
contextmanager as cm,
# TODO, any diff in async case(s)??
# asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
@cm
def teardown_on_exc(
raise_from_handler: bool = False,
):
'''
You could also have a teardown handler which catches any exc and
does some required teardown. In this case the problem is
compounded UNLESS you ensure the handler's scope is OUTSIDE the
`ux.aclose()`.. that is in the caller's enclosing scope.
'''
try:
yield
except BaseException as _berr:
berr = _berr
log.exception(
f'Handling termination teardown in child due to,\n'
f'{berr!r}\n'
)
if raise_from_handler:
# XXX teardown ops XXX
# on termination these steps say need to be run to
# ensure wider system consistency (like the state of
# remote connections/services).
#
# HOWEVER, any bug in this teardown code is also
# masked by the `tx.aclose()`!
# this is also true if `_tn.cancel_scope` is
# `.cancel_called` by the parent in a graceful
# request case..
# simulate a bug in teardown handler.
raise RuntimeError(
'woopsie teardown bug!'
)
raise # no teardown bug.
async def finite_stream_to_rent(
tx: trio.abc.SendChannel,
child_errors_mid_stream: bool,
raise_unmasked: bool,
task_status: trio.TaskStatus[
trio.CancelScope,
] = trio.TASK_STATUS_IGNORED,
):
async with (
# XXX without this unmasker the mid-streaming RTE is never
# reported since it is masked by the `tx.aclose()`
# call which in turn raises `Cancelled`!
#
# NOTE, this is WITHOUT doing any exception handling
# inside the child task!
#
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
tractor.trionics.maybe_raise_from_masking_exc(
raise_unmasked=raise_unmasked,
),
tx as tx, # .aclose() is the guilty masker chkpt!
# XXX, this ONLY matters in the
# `child_errors_mid_stream=False` case oddly!?
# THAT IS, if no tn is opened in that case then the
# test will not fail; it raises the RTE correctly?
#
# -> so it seems this new scope somehow affects the form of
# eventual in the parent EG?
tractor.trionics.maybe_open_nursery(
nursery=(
None
if not child_errors_mid_stream
else True
),
) as _tn,
):
# pass our scope back to parent for supervision\
# control.
cs: trio.CancelScope|None = (
None
if _tn is True
else _tn.cancel_scope
)
task_status.started(cs)
with teardown_on_exc(
raise_from_handler=not child_errors_mid_stream,
):
for i in range(100):
log.debug(
f'Child tx {i!r}\n'
)
if (
child_errors_mid_stream
and
i == 66
):
# oh wait but WOOPS there's a bug
# in that teardown code!?
raise RuntimeError(
'woopsie, a mid-streaming bug!?'
)
await tx.send(i)
async def main(
# TODO! toggle this for the 2 cases!
# 1. child errors mid-stream while parent is also requesting
# (graceful) cancel of that child streamer.
#
# 2. child contains a teardown handler which contains a
# bug and raises.
#
child_errors_mid_stream: bool,
raise_unmasked: bool = False,
loglevel: str = 'info',
):
tractor.log.get_console_log(level=loglevel)
# the `.aclose()` being checkpoints on these
# is the source of the problem..
tx, rx = trio.open_memory_channel(1)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
rx as rx,
):
_child_cs = await tn.start(
partial(
finite_stream_to_rent,
child_errors_mid_stream=child_errors_mid_stream,
raise_unmasked=raise_unmasked,
tx=tx,
)
)
async for msg in rx:
log.debug(
f'Rent rx {msg!r}\n'
)
# simulate some external cancellation
# request **JUST BEFORE** the child errors.
if msg == 65:
log.cancel(
f'Cancelling parent on,\n'
f'msg={msg}\n'
f'\n'
f'Simulates OOB cancel request!\n'
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'child_errors_midstream: {case!r}\n'
)
try:
trio.run(partial(
main,
child_errors_mid_stream=case,
# raise_unmasked=True,
loglevel='info',
))
except Exception as _exc:
exc = _exc
log.exception(
'Should have raised an RTE or Cancelled?\n'
)
breakpoint()

View File

@ -95,7 +95,6 @@ def run_example_in_subproc(
and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
and 'multihost' not in p[0]
and 'trio' not in p[0]
)
],
ids=lambda t: t[1],

View File

@ -24,10 +24,14 @@ from tractor._testing import (
)
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
@ -40,6 +44,16 @@ from tractor._testing import (
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context
async def open_stream_then_sleep_forever(
ctx: Context,
@ -792,7 +806,7 @@ async def basic_echo_server(
ctx: Context,
peer_name: str = 'wittle_bruv',
err_after_imsg: int|None = None,
err_after: int|None = None,
) -> None:
'''
@ -821,9 +835,8 @@ async def basic_echo_server(
await ipc.send(resp)
if (
err_after_imsg
and
i > err_after_imsg
err_after
and i > err_after
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
@ -965,8 +978,7 @@ async def tell_little_bro(
actor_name: str,
caller: str = '',
err_after: float|None = None,
rng_seed: int = 50,
err_after: int|None = None,
):
# contact target actor, do a stream dialog.
async with (
@ -977,18 +989,14 @@ async def tell_little_bro(
basic_echo_server,
# XXX proxy any delayed err condition
err_after_imsg=(
err_after * rng_seed
if err_after is not None
else None
),
err_after=err_after,
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
uid: tuple = actor.uid
for i in range(rng_seed):
for i in range(100):
msg: tuple = (
uid,
i,
@ -1013,13 +1021,13 @@ async def tell_little_bro(
)
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 0.5],
[None, 50],
)
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None,
raise_sub_spawn_error_after: int|None,
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -1033,7 +1041,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
@ -1127,7 +1134,8 @@ def test_peer_spawns_and_cancels_service_subactor(
)
try:
res = await client_ctx.wait_for_result(hide_tb=False)
res = await client_ctx.result(hide_tb=False)
# in remote (relayed inception) error
# case, we should error on the line above!
if raise_sub_spawn_error_after:
@ -1138,23 +1146,6 @@ def test_peer_spawns_and_cancels_service_subactor(
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# XXX, only for tracing
# except BaseException as _berr:
# berr = _berr
# await tractor.pause(shield=True)
# raise berr
except RemoteActorError as rae:
_err = rae
@ -1183,8 +1174,19 @@ def test_peer_spawns_and_cancels_service_subactor(
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# await tractor.pause()
# await server.cancel_actor()
except RemoteActorError as rae:
@ -1197,7 +1199,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not
# called directly for this confext.
# called directly fot this confext.
except ContextCancelled as ctxc:
_ctxc = ctxc
print(
@ -1237,19 +1239,12 @@ def test_peer_spawns_and_cancels_service_subactor(
# assert spawn_ctx.cancelled_caught
async def _main():
with trio.fail_after(
3 if not debug_mode
else 999
):
await main()
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(_main)
trio.run(main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(_main)
trio.run(main)

View File

@ -1,239 +0,0 @@
'''
Define the details of inter-actor "out-of-band" (OoB) cancel
semantics, that is how cancellation works when a cancel request comes
from the different concurrency (primitive's) "layer" then where the
eventual `trio.Task` actually raises a signal.
'''
from functools import partial
# from contextlib import asynccontextmanager as acm
# import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
ActorNursery,
Portal,
Context,
# ContextCancelled,
# RemoteActorError,
)
# from tractor._testing import (
# tractor_test,
# expect_ctxc,
# )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
#
# things to ensure!
# -[ ] the ctxc raised in a child should ideally show the tb of the
# underlying `Cancelled` checkpoint, i.e.
# `raise scope_error from ctxc`?
#
# -[ ] a self-cancelled context, if not allowed to block on
# `ctx.result()` at some point will hang since the `ctx._scope`
# is never `.cancel_called`; cases for this include,
# - an `open_ctx()` which never starteds before being OoB actor
# cancelled.
# |_ parent task will be blocked in `.open_context()` for the
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
# will never have been signalled..
# '''
# ...
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
# but with the `Lock.acquire()` from a `@context` to ensure the
# implicit ignore-case-non-unmasking.
#
# @tractor.context
# async def acquire_actor_global_lock(
# ctx: tractor.Context,
# ignore_special_cases: bool,
# ):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
# # block til cancelled
# await trio.sleep_forever()
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
# ignore_special_cases: bool,
do_started: bool,
):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
if do_started:
await ctx.started()
# block til cancelled
print('sleepin on child-side..')
await trio.sleep_forever()
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
)
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
debug_mode: bool,
loglevel: str,
cancel_ctx: bool,
):
'''
The most "basic" out-of-band-task self-cancellation case where
`Portal.open_context()` is entered in a bg task and the
parent-task (of the containing nursery) calls `Context.cancel()`
without the child knowing; the `Context._scope` should be
`.cancel_called` when the IPC ctx's child-side relays
a `ContextCancelled` with a `.canceller` set to the parent
actor('s task).
'''
async def main():
with trio.fail_after(
2 if not debug_mode else 999,
):
an: ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
loglevel='devx',
enable_stack_on_sig=True,
) as an,
trio.open_nursery() as tn,
):
ptl: Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
async def _open_ctx_async(
do_started: bool = True,
task_status=trio.TASK_STATUS_IGNORED,
):
# do we expect to never enter the
# `.open_context()` below.
if not do_started:
task_status.started()
async with ptl.open_context(
sleep_forever,
do_started=do_started,
) as (ctx, first):
task_status.started(ctx)
await trio.sleep_forever()
# XXX, this is the key OoB part!
#
# - start the `.open_context()` in a bg task which
# blocks inside the embedded scope-body,
#
# - when we call `Context.cancel()` it **is
# not** from the same task which eventually runs
# `.__aexit__()`,
#
# - since the bg "opener" task will be in
# a `trio.sleep_forever()`, it must be interrupted
# by the `ContextCancelled` delivered from the
# child-side; `Context._scope: CancelScope` MUST
# be `.cancel_called`!
#
print('ASYNC opening IPC context in subtask..')
maybe_ctx: Context|None = await tn.start(partial(
_open_ctx_async,
))
if (
maybe_ctx
and
cancel_ctx
):
print('cancelling first IPC ctx!')
await maybe_ctx.cancel()
# XXX, note that despite `maybe_context.cancel()`
# being called above, it's the parent (bg) task
# which was originally never interrupted in
# the `ctx._scope` body due to missing case logic in
# `ctx._maybe_cancel_and_set_remote_error()`.
#
# It didn't matter that the subactor process was
# already terminated and reaped, nothing was
# cancelling the ctx-parent task's scope!
#
print('cancelling subactor!')
await ptl.cancel_actor()
if maybe_ctx:
try:
await maybe_ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert not cancel_ctx
assert (
ctxc.canceller
==
tractor.current_actor().aid.uid
)
# don't re-raise since it'll trigger
# an EG from the above tn.
if cancel_ctx:
# graceful self-cancel
trio.run(main)
else:
# ctx parent task should see OoB ctxc due to
# `ptl.cancel_actor()`.
with pytest.raises(tractor.ContextCancelled) as excinfo:
trio.run(main)
assert 'root' in excinfo.value.canceller[0]
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
# debug_mode: bool,
# loglevel: str,
# ):
# '''
# Demos OoB cancellation from the perspective of a ctx opened with
# a child subactor where the parent cancels the child at the "actor
# layer" using `Portal.cancel_actor()` and thus the
# `ContextCancelled.canceller` received by the ctx's parent-side
# task will appear to be a "self cancellation" even though that
# specific task itself was not cancelled and thus
# `Context.cancel_called ==False`.
# '''
# TODO, do we have an existing implied ctx
# cancel test like this?
# with trio.move_on_after(0.5):# as cs:
# await _open_ctx_async(
# do_started=False,
# )
# in-line ctx scope should definitely raise
# a ctxc with `.canceller = 'root'`
# async with ptl.open_context(
# sleep_forever,
# do_started=True,
# ) as pair:

View File

@ -6,18 +6,11 @@ want to see changed.
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from functools import partial
import pytest
from _pytest import pathlib
from tractor.trionics import collapse_eg
import trio
from trio import TaskStatus
from tractor._testing import (
examples_dir,
)
@pytest.mark.parametrize(
@ -113,9 +106,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
debug_mode: bool,
):
'''
Demo how a masking `trio.Cancelled` could be handled by unmasking
from the `.__context__` field when a user (by accident) re-raises
from a `finally:`.
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
`.__context__` field when a user (by accident) re-raises from a `finally:`.
'''
import tractor
@ -125,9 +117,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
async with (
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc(
tn=tn,
unmask_from=(
(trio.Cancelled,) if unmask_from_canc
else ()
trio.Cancelled
if unmask_from_canc
else None
),
)
):
@ -142,7 +136,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
assert not bxerr.value
if bxerr:
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,
@ -150,12 +145,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert not tn.cancel_scope.cancel_called
assert 0
if debug_mode:
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
with pytest.raises(ExceptionGroup) as excinfo:
trio.run(_main)
@ -166,13 +160,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert len(assert_eg.exceptions) == 1
def test_gatherctxs_with_memchan_breaks_multicancelled(
debug_mode: bool,
):
'''
Demo how a using an `async with sndchan` inside
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
multi-cancelled absorption..
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
will break a strict-eg-tn's multi-cancelled absorption..
'''
from tractor import (
@ -198,6 +192,7 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
f'Closed {task!r}\n'
)
async def main():
async with (
# XXX should ensure ONLY the KBI
@ -218,85 +213,3 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
with pytest.raises(KeyboardInterrupt):
trio.run(main)
@pytest.mark.parametrize(
'raise_unmasked', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/send_chan_aclose_masks.py"
)
),
]
)
@pytest.mark.parametrize(
'child_errors_mid_stream',
[True, False],
)
def test_unmask_aclose_as_checkpoint_on_aexit(
raise_unmasked: bool,
child_errors_mid_stream: bool,
debug_mode: bool,
):
'''
Verify that our unmasker util works over the common case where
a mem-chan's `.aclose()` is included in an `@acm` stack
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
exception from user code resulting in a silent failure which
appears like graceful cancellation.
This test suite is mostly implemented as an example script so it
could more easily be shared with `trio`-core peeps as `tractor`-less
minimum reproducing example.
'''
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'send_chan_aclose_masks_beg.py',
root=examples_dir(),
consider_namespace_packages=False,
)
with pytest.raises(RuntimeError):
trio.run(partial(
mod.main,
raise_unmasked=raise_unmasked,
child_errors_mid_stream=child_errors_mid_stream,
))
@pytest.mark.parametrize(
'ignore_special_cases', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/lockacquire_not_umasked.py"
)
),
]
)
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
ignore_special_cases: bool,
loglevel: str,
debug_mode: bool,
):
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'lockacquire_not_unmasked.py',
root=examples_dir(),
consider_namespace_packages=False,
)
async def _main():
with trio.fail_after(2):
await mod.main(
ignore_special_cases=ignore_special_cases,
loglevel=loglevel,
debug_mode=debug_mode,
)
trio.run(_main)

View File

@ -442,25 +442,25 @@ class Context:
'''
Records whether cancellation has been requested for this context
by a call to `.cancel()` either due to,
- an explicit call by some local task,
- either an explicit call by some local task,
- or an implicit call due to an error caught inside
the `Portal.open_context()` block.
the ``Portal.open_context()`` block.
'''
return self._cancel_called
# XXX, to debug who frickin sets it..
# @cancel_called.setter
# def cancel_called(self, val: bool) -> None:
# '''
# Set the self-cancelled request `bool` value.
@cancel_called.setter
def cancel_called(self, val: bool) -> None:
'''
Set the self-cancelled request `bool` value.
# '''
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
'''
# to debug who frickin sets it..
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
# self._cancel_called = val
self._cancel_called = val
@property
def canceller(self) -> tuple[str, str]|None:
@ -635,71 +635,6 @@ class Context:
'''
await self.chan.send(Stop(cid=self.cid))
@property
def parent_task(self) -> trio.Task:
'''
This IPC context's "owning task" which is a `trio.Task`
on one of the "sides" of the IPC.
Note that the "parent_" prefix here refers to the local
`trio` task tree using the same interface as
`trio.Nursery.parent_task` whereas for IPC contexts,
a different cross-actor task hierarchy exists:
- a "parent"-side which originally entered
`Portal.open_context()`,
- the "child"-side which was spawned and scheduled to invoke
a function decorated with `@tractor.context`.
This task is thus a handle to mem-domain-distinct/per-process
`Nursery.parent_task` depending on in which of the above
"sides" this context exists.
'''
return self._task
def _is_blocked_on_rx_chan(self) -> bool:
'''
Predicate to indicate whether the owner `._task: trio.Task` is
currently blocked (by `.receive()`-ing) on its underlying RPC
feeder `._rx_chan`.
This knowledge is highly useful when handling so called
"out-of-band" (OoB) cancellation conditions where a peer
actor's task transmitted some remote error/cancel-msg and we
must know whether to signal-via-cancel currently executing
"user-code" (user defined code embedded in `ctx._scope`) or
simply to forward the IPC-msg-as-error **without calling**
`._scope.cancel()`.
In the latter case it is presumed that if the owner task is
blocking for the next IPC msg, it will eventually receive,
process and raise the equivalent local error **without**
requiring `._scope.cancel()` to be explicitly called by the
*delivering OoB RPC-task* (via `_deliver_msg()`).
'''
# NOTE, see the mem-chan meth-impls for *why* this
# logic works,
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
#
# XXX realize that this is NOT an
# official/will-be-loudly-deprecated API:
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
#
# orig repo intro in the mem-chan change over patch:
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
# |_https://github.com/python-trio/trio/pull/616
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
#
return (
self._task.custom_sleep_data
is
self._rx_chan
)
def _maybe_cancel_and_set_remote_error(
self,
error: BaseException,
@ -852,27 +787,13 @@ class Context:
if self._canceller is None:
log.error('Ctx has no canceller set!?')
cs: trio.CancelScope = self._scope
# ?TODO? see comment @ .start_remote_task()`
#
# if not cs:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise RuntimeError(
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
# f'{self}\n'
# f'\n'
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
# )
# Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the
# `.open_context()` block.
cs: trio.CancelScope = self._scope
if (
cs
and not cs.cancel_called
# XXX this is an expected cancel request response
# message and we **don't need to raise it** in the
@ -881,7 +802,8 @@ class Context:
# if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set.
and not self._is_self_cancelled()
# and not cs.cancelled_caught
and not cs.cancel_called
and not cs.cancelled_caught
):
if (
msgerr
@ -892,7 +814,7 @@ class Context:
not self._cancel_on_msgerr
):
message: str = (
f'NOT Cancelling `Context._scope` since,\n'
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
@ -902,43 +824,13 @@ class Context:
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
cs.cancel()
# TODO, explicit condition for OoB (self-)cancellation?
# - we called `Portal.cancel_actor()` from this actor
# and the peer ctx task delivered ctxc due to it.
# - currently `self._is_self_cancelled()` will be true
# since the ctxc.canceller check will match us even though it
# wasn't from this ctx specifically!
elif (
cs
and self._is_self_cancelled()
and not cs.cancel_called
):
message: str = (
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
'\n'
)
# from .devx import pause_from_sync
# pause_from_sync()
self._scope.cancel()
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
# mk_pdb().set_trace()
# TODO XXX, required to fix timeout failure in
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
#
# XXX NOTE XXX, this is SUPER SUBTLE!
# we only want to cancel our embedded `._scope`
# if the ctx's current/using task is NOT blocked
# on `._rx_chan.receive()` and on some other
# `trio`-checkpoint since in the former case
# any `._remote_error` will be relayed through
# the rx-chan and appropriately raised by the owning
# `._task` directly. IF the owner task is however
# blocking elsewhere we need to interrupt it **now**.
if not self._is_blocked_on_rx_chan():
cs.cancel()
else:
# rx_stats = self._rx_chan.statistics()
message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if (
@ -962,7 +854,6 @@ class Context:
+
cs_fmt
)
log.cancel(
message
+
@ -1055,9 +946,8 @@ class Context:
'''
side: str = self.side
self._cancel_called = True
# ^ XXX for debug via the `@.setter`
# self.cancel_called = True
# XXX for debug via the `@.setter`
self.cancel_called = True
header: str = (
f'Cancelling ctx from {side!r}-side\n'
@ -2121,9 +2011,6 @@ async def open_context_from_portal(
f'|_{portal.actor}\n'
)
# ?TODO? could we move this to inside the `tn` block?
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
# -> would allow a `if not ._scope: => raise RTE` ?
ctx: Context = await portal.actor.start_remote_task(
portal.channel,
nsf=nsf,
@ -2150,7 +2037,6 @@ async def open_context_from_portal(
scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None
try:
# from .devx import pause
async with (
collapse_eg(),
trio.open_nursery() as tn,
@ -2173,10 +2059,6 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
try:
log.runtime(
f'IPC ctx parent waiting on Started msg..\n'
f'ctx.cid: {ctx.cid!r}\n'
)
started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx,
expect_msg=Started,
@ -2185,16 +2067,16 @@ async def open_context_from_portal(
)
except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope
log.cancel(
f'IPC ctx was cancelled during "child" task sync due to\n\n'
f'.cid: {ctx.cid!r}\n'
f'.maybe_error: {ctx.maybe_error!r}\n'
)
# await pause(shield=True)
if not ctx_cs.cancel_called:
raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure
@ -2317,7 +2199,7 @@ async def open_context_from_portal(
# documenting it as a definittive example of
# debugging the tractor-runtime itself using it's
# own `.devx.` tooling!
#
#
# await debug.pause()
# CASE 2: context was cancelled by local task calling
@ -2390,16 +2272,13 @@ async def open_context_from_portal(
match scope_err:
case trio.Cancelled():
logmeth = log.cancel
cause: str = 'cancelled'
# XXX explicitly report on any non-graceful-taskc cases
case _:
cause: str = 'errored'
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
)
if debug_mode():
@ -2424,7 +2303,6 @@ async def open_context_from_portal(
# told us it's cancelled ;p
if ctxc_from_child is None:
try:
# await pause(shield=True)
await ctx.cancel()
except (
trio.BrokenResourceError,
@ -2581,10 +2459,8 @@ async def open_context_from_portal(
log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n'
f'c)>\n'
f' |_{ctx.parent_task}\n'
f' .cid={ctx.cid!r}\n'
f'\n'
f'{scope_err!r}\n'
f' |_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
)
# TODO: should we add a `._cancel_req_received`

View File

@ -654,7 +654,8 @@ async def _invoke(
# scope ensures unasking of the `await coro` below
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
unmask_from=(Cancelled,),
tn=tn,
unmask_from=Cancelled,
) as _mbme, # maybe boxed masked exc
):
ctx._scope_nursery = tn

View File

@ -446,12 +446,12 @@ class ActorNursery:
@acm
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
hide_tb: bool = True,
tb_hide: bool = False,
) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default
__tracebackhide__: bool = hide_tb
__tracebackhide__: bool = tb_hide
outer_err: BaseException|None = None
inner_err: BaseException|None = None

View File

@ -613,9 +613,10 @@ async def drain_to_final_msg(
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
#
# -[x] make sure pause points work here for REPLing
# -[ ] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs!
# |_see masked code below in .cancel_called path
# |_from tractor.devx.debug import pause
# await pause()
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
@ -651,10 +652,6 @@ async def drain_to_final_msg(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# XXX, for tracing `Cancelled`..
# from tractor.devx.debug import pause
# await pause(shield=True)
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's

View File

@ -30,6 +30,7 @@ from typing import (
AsyncIterator,
Callable,
Hashable,
Optional,
Sequence,
TypeVar,
)
@ -175,7 +176,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_tn: trio.Nursery|None = None
service_tn: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -184,7 +185,7 @@ class _Cache:
tuple[trio.Nursery, trio.Event]
] = {}
# nurseries: dict[int, trio.Nursery] = {}
no_more_users: trio.Event|None = None
no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
@ -194,18 +195,16 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value
task_status.started(value)
await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
cls.resources.pop(ctx_key)
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
cls.values[ctx_key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key)
@acm

View File

@ -22,14 +22,7 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
import inspect
from types import (
TracebackType,
)
from typing import (
Type,
TYPE_CHECKING,
)
from typing import TYPE_CHECKING
import trio
from tractor.log import get_logger
@ -67,71 +60,12 @@ def find_masked_excs(
return None
_mask_cases: dict[
Type[Exception], # masked exc type
dict[
int, # inner-frame index into `inspect.getinnerframes()`
# `FrameInfo.function/filename: str`s to match
dict[str, str],
],
] = {
trio.WouldBlock: {
# `trio.Lock.acquire()` has a checkpoint inside the
# `WouldBlock`-no_wait path's handler..
-5: { # "5th frame up" from checkpoint
'filename': 'trio/_sync.py',
'function': 'acquire',
# 'lineno': 605, # matters?
},
}
}
def is_expected_masking_case(
cases: dict,
exc_ctx: Exception,
exc_match: BaseException,
) -> bool|inspect.FrameInfo:
'''
Determine whether the provided masked exception is from a known
bug/special/unintentional-`trio`-impl case which we do not wish
to unmask.
Return any guilty `inspect.FrameInfo` ow `False`.
'''
exc_tb: TracebackType = exc_match.__traceback__
if cases := _mask_cases.get(type(exc_ctx)):
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
for iframe, matchon in cases.items():
try:
masker_frame: inspect.FrameInfo = inner[iframe]
except IndexError:
continue
for field, in_field in matchon.items():
val = getattr(
masker_frame,
field,
)
if in_field not in val:
break
else:
return masker_frame
return False
# XXX, relevant discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455
#
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: (
BaseException|
tuple[BaseException]
@ -140,30 +74,18 @@ async def maybe_raise_from_masking_exc(
raise_unmasked: bool = True,
extra_note: str = (
'This can occurr when,\n'
'\n'
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
'which execs an un-shielded checkpoint!'
' - a `trio.Nursery` scope embeds a `finally:`-block '
'which executes a checkpoint!'
#
# ^TODO? other cases?
),
always_warn_on: tuple[Type[BaseException]] = (
always_warn_on: tuple[BaseException] = (
trio.Cancelled,
),
# don't ever unmask or warn on any masking pair,
# {<masked-excT-key> -> <masking-excT-value>}
never_warn_on: dict[
Type[BaseException],
Type[BaseException],
] = {
KeyboardInterrupt: trio.Cancelled,
trio.Cancelled: trio.Cancelled,
},
# ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc
# is always expected to be consumed.
) -> BoxedMaybeException:
'''
Maybe un-mask and re-raise exception(s) suppressed by a known
@ -182,112 +104,81 @@ async def maybe_raise_from_masking_exc(
individual sub-excs but maintain the eg-parent's form right?
'''
if not isinstance(unmask_from, tuple):
raise ValueError(
f'Invalid unmask_from = {unmask_from!r}\n'
f'Must be a `tuple[Type[BaseException]]`.\n'
)
from tractor.devx.debug import (
BoxedMaybeException,
pause,
)
boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked,
)
matching: list[BaseException]|None = None
try:
yield boxed_maybe_exc
return
except BaseException as _bexc:
bexc = _bexc
if isinstance(bexc, BaseExceptionGroup):
matches: ExceptionGroup
matches, _ = bexc.split(unmask_from)
if matches:
matching = matches.exceptions
maybe_eg: ExceptionGroup|None
elif (
unmask_from
and
type(bexc) in unmask_from
):
matching = [bexc]
if tn:
try: # handle egs
yield boxed_maybe_exc
return
except* unmask_from as _maybe_eg:
maybe_eg = _maybe_eg
matches: ExceptionGroup
matches, _ = maybe_eg.split(
unmask_from
)
if not matches:
raise
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
if matching is None:
raise
masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching:
if exc_ctx := find_masked_excs(
maybe_masker=exc_match,
unmask_from=set(unmask_from),
unmask_from={unmask_from},
):
masked.append((
exc_ctx,
exc_match,
))
masked.append((exc_ctx, exc_match))
boxed_maybe_exc.value = exc_match
note: str = (
f'\n'
f'^^WARNING^^\n'
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
)
if extra_note:
note += (
f'\n'
f'{extra_note}\n'
)
exc_ctx.add_note(note)
do_warn: bool = (
never_warn_on.get(
type(exc_ctx) # masking type
)
is not
type(exc_match) # masked type
)
if do_warn:
exc_ctx.add_note(note)
if (
do_warn
and
type(exc_match) in always_warn_on
):
if type(exc_match) in always_warn_on:
log.warning(note)
if (
do_warn
and
raise_unmasked
):
# await tractor.pause(shield=True)
if raise_unmasked:
if len(masked) < 2:
# don't unmask already known "special" cases..
if (
_mask_cases
and
(cases := _mask_cases.get(type(exc_ctx)))
and
(masker_frame := is_expected_masking_case(
cases,
exc_ctx,
exc_match,
))
):
log.warning(
f'Ignoring already-known, non-ideal-but-valid '
f'masker code @\n'
f'{masker_frame}\n'
f'\n'
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
)
raise exc_match
raise exc_ctx from exc_match
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else:
# ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
await pause(shield=True)
else:
raise