Compare commits
41 Commits
23809b8468
...
653f23a04c
Author | SHA1 | Date |
---|---|---|
|
653f23a04c | |
|
90db6f2299 | |
|
b2d63bc102 | |
|
d433606a6b | |
|
cb9569eace | |
|
9798fcd3bb | |
|
03549c51ab | |
|
256016c515 | |
|
17b2a2cab4 | |
|
4fbafe7ca4 | |
|
7ffdf3483a | |
|
76ed0f2ef6 | |
|
2afb624c48 | |
|
885137ac19 | |
|
83ce2275b9 | |
|
9f757ffa63 | |
|
0c6d512ba4 | |
|
fc130d06b8 | |
|
73423ef2b7 | |
|
b1f2a6b394 | |
|
9489a2f84d | |
|
92eaed6fec | |
|
217d54b9d1 | |
|
34ca02ed11 | |
|
62a364a1d3 | |
|
07781e38cd | |
|
9c6b90ef04 | |
|
542d4c7840 | |
|
9aebe7d8f9 | |
|
04c3d5e239 | |
|
759174729c | |
|
e9f3689191 | |
|
93aa39db07 | |
|
5ab642bdf0 | |
|
ed18ecd064 | |
|
cec0282953 | |
|
25c5847f2e | |
|
ba793fadd9 | |
|
d17864a432 | |
|
6c361a9564 | |
|
34ca7429c7 |
|
@ -0,0 +1,85 @@
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(
|
||||
name=__name__
|
||||
)
|
||||
|
||||
_lock: trio.Lock|None = None
|
||||
|
||||
|
||||
@acm
|
||||
async def acquire_singleton_lock(
|
||||
) -> None:
|
||||
global _lock
|
||||
if _lock is None:
|
||||
log.info('Allocating LOCK')
|
||||
_lock = trio.Lock()
|
||||
|
||||
log.info('TRYING TO LOCK ACQUIRE')
|
||||
async with _lock:
|
||||
log.info('ACQUIRED')
|
||||
yield _lock
|
||||
|
||||
log.info('RELEASED')
|
||||
|
||||
|
||||
|
||||
async def hold_lock_forever(
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
async with (
|
||||
tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
acquire_singleton_lock() as lock,
|
||||
):
|
||||
task_status.started(lock)
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def main(
|
||||
ignore_special_cases: bool,
|
||||
loglevel: str = 'info',
|
||||
debug_mode: bool = True,
|
||||
):
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
|
||||
# tractor.trionics.maybe_raise_from_masking_exc()
|
||||
# ^^^ XXX NOTE, interestingly putting the unmasker
|
||||
# here does not exhibit the same behaviour ??
|
||||
):
|
||||
if not ignore_special_cases:
|
||||
from tractor.trionics import _taskc
|
||||
_taskc._mask_cases.clear()
|
||||
|
||||
_lock = await tn.start(
|
||||
hold_lock_forever,
|
||||
)
|
||||
with trio.move_on_after(0.2):
|
||||
await tn.start(
|
||||
hold_lock_forever,
|
||||
)
|
||||
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# XXX, manual test as script
|
||||
if __name__ == '__main__':
|
||||
tractor.log.get_console_log(level='info')
|
||||
for case in [True, False]:
|
||||
log.info(
|
||||
f'\n'
|
||||
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||
f'ignore_special_cases: {case!r}\n'
|
||||
)
|
||||
trio.run(partial(
|
||||
main,
|
||||
ignore_special_cases=case,
|
||||
loglevel='info',
|
||||
))
|
|
@ -0,0 +1,195 @@
|
|||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
# TODO, any diff in async case(s)??
|
||||
# asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(
|
||||
name=__name__
|
||||
)
|
||||
|
||||
|
||||
@cm
|
||||
def teardown_on_exc(
|
||||
raise_from_handler: bool = False,
|
||||
):
|
||||
'''
|
||||
You could also have a teardown handler which catches any exc and
|
||||
does some required teardown. In this case the problem is
|
||||
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||
|
||||
'''
|
||||
try:
|
||||
yield
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
log.exception(
|
||||
f'Handling termination teardown in child due to,\n'
|
||||
f'{berr!r}\n'
|
||||
)
|
||||
if raise_from_handler:
|
||||
# XXX teardown ops XXX
|
||||
# on termination these steps say need to be run to
|
||||
# ensure wider system consistency (like the state of
|
||||
# remote connections/services).
|
||||
#
|
||||
# HOWEVER, any bug in this teardown code is also
|
||||
# masked by the `tx.aclose()`!
|
||||
# this is also true if `_tn.cancel_scope` is
|
||||
# `.cancel_called` by the parent in a graceful
|
||||
# request case..
|
||||
|
||||
# simulate a bug in teardown handler.
|
||||
raise RuntimeError(
|
||||
'woopsie teardown bug!'
|
||||
)
|
||||
|
||||
raise # no teardown bug.
|
||||
|
||||
|
||||
async def finite_stream_to_rent(
|
||||
tx: trio.abc.SendChannel,
|
||||
child_errors_mid_stream: bool,
|
||||
raise_unmasked: bool,
|
||||
|
||||
task_status: trio.TaskStatus[
|
||||
trio.CancelScope,
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
async with (
|
||||
# XXX without this unmasker the mid-streaming RTE is never
|
||||
# reported since it is masked by the `tx.aclose()`
|
||||
# call which in turn raises `Cancelled`!
|
||||
#
|
||||
# NOTE, this is WITHOUT doing any exception handling
|
||||
# inside the child task!
|
||||
#
|
||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||
tractor.trionics.maybe_raise_from_masking_exc(
|
||||
raise_unmasked=raise_unmasked,
|
||||
),
|
||||
|
||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||
|
||||
# XXX, this ONLY matters in the
|
||||
# `child_errors_mid_stream=False` case oddly!?
|
||||
# THAT IS, if no tn is opened in that case then the
|
||||
# test will not fail; it raises the RTE correctly?
|
||||
#
|
||||
# -> so it seems this new scope somehow affects the form of
|
||||
# eventual in the parent EG?
|
||||
tractor.trionics.maybe_open_nursery(
|
||||
nursery=(
|
||||
None
|
||||
if not child_errors_mid_stream
|
||||
else True
|
||||
),
|
||||
) as _tn,
|
||||
):
|
||||
# pass our scope back to parent for supervision\
|
||||
# control.
|
||||
cs: trio.CancelScope|None = (
|
||||
None
|
||||
if _tn is True
|
||||
else _tn.cancel_scope
|
||||
)
|
||||
task_status.started(cs)
|
||||
|
||||
with teardown_on_exc(
|
||||
raise_from_handler=not child_errors_mid_stream,
|
||||
):
|
||||
for i in range(100):
|
||||
log.debug(
|
||||
f'Child tx {i!r}\n'
|
||||
)
|
||||
if (
|
||||
child_errors_mid_stream
|
||||
and
|
||||
i == 66
|
||||
):
|
||||
# oh wait but WOOPS there's a bug
|
||||
# in that teardown code!?
|
||||
raise RuntimeError(
|
||||
'woopsie, a mid-streaming bug!?'
|
||||
)
|
||||
|
||||
await tx.send(i)
|
||||
|
||||
|
||||
async def main(
|
||||
# TODO! toggle this for the 2 cases!
|
||||
# 1. child errors mid-stream while parent is also requesting
|
||||
# (graceful) cancel of that child streamer.
|
||||
#
|
||||
# 2. child contains a teardown handler which contains a
|
||||
# bug and raises.
|
||||
#
|
||||
child_errors_mid_stream: bool,
|
||||
|
||||
raise_unmasked: bool = False,
|
||||
loglevel: str = 'info',
|
||||
):
|
||||
tractor.log.get_console_log(level=loglevel)
|
||||
|
||||
# the `.aclose()` being checkpoints on these
|
||||
# is the source of the problem..
|
||||
tx, rx = trio.open_memory_channel(1)
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
rx as rx,
|
||||
):
|
||||
_child_cs = await tn.start(
|
||||
partial(
|
||||
finite_stream_to_rent,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
raise_unmasked=raise_unmasked,
|
||||
tx=tx,
|
||||
)
|
||||
)
|
||||
async for msg in rx:
|
||||
log.debug(
|
||||
f'Rent rx {msg!r}\n'
|
||||
)
|
||||
|
||||
# simulate some external cancellation
|
||||
# request **JUST BEFORE** the child errors.
|
||||
if msg == 65:
|
||||
log.cancel(
|
||||
f'Cancelling parent on,\n'
|
||||
f'msg={msg}\n'
|
||||
f'\n'
|
||||
f'Simulates OOB cancel request!\n'
|
||||
)
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# XXX, manual test as script
|
||||
if __name__ == '__main__':
|
||||
tractor.log.get_console_log(level='info')
|
||||
for case in [True, False]:
|
||||
log.info(
|
||||
f'\n'
|
||||
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||
f'child_errors_midstream: {case!r}\n'
|
||||
)
|
||||
try:
|
||||
trio.run(partial(
|
||||
main,
|
||||
child_errors_mid_stream=case,
|
||||
# raise_unmasked=True,
|
||||
loglevel='info',
|
||||
))
|
||||
except Exception as _exc:
|
||||
exc = _exc
|
||||
log.exception(
|
||||
'Should have raised an RTE or Cancelled?\n'
|
||||
)
|
||||
breakpoint()
|
|
@ -95,6 +95,7 @@ def run_example_in_subproc(
|
|||
and 'integration' not in p[0]
|
||||
and 'advanced_faults' not in p[0]
|
||||
and 'multihost' not in p[0]
|
||||
and 'trio' not in p[0]
|
||||
)
|
||||
],
|
||||
ids=lambda t: t[1],
|
||||
|
|
|
@ -24,14 +24,10 @@ from tractor._testing import (
|
|||
)
|
||||
|
||||
# XXX TODO cases:
|
||||
# - [ ] peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - [x] WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||
# Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
|
@ -44,16 +40,6 @@ from tractor._testing import (
|
|||
# that also spawned a remote task task in that same peer-parent.
|
||||
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
# 2 cases:
|
||||
# - calls `Actor.cancel()` locally in some task
|
||||
# - calls LocalPortal.cancel_actor()` ?
|
||||
|
||||
# '''
|
||||
# ...
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def open_stream_then_sleep_forever(
|
||||
ctx: Context,
|
||||
|
@ -806,7 +792,7 @@ async def basic_echo_server(
|
|||
ctx: Context,
|
||||
peer_name: str = 'wittle_bruv',
|
||||
|
||||
err_after: int|None = None,
|
||||
err_after_imsg: int|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -835,8 +821,9 @@ async def basic_echo_server(
|
|||
await ipc.send(resp)
|
||||
|
||||
if (
|
||||
err_after
|
||||
and i > err_after
|
||||
err_after_imsg
|
||||
and
|
||||
i > err_after_imsg
|
||||
):
|
||||
raise RuntimeError(
|
||||
f'Simulated error in `{peer_name}`'
|
||||
|
@ -978,7 +965,8 @@ async def tell_little_bro(
|
|||
actor_name: str,
|
||||
|
||||
caller: str = '',
|
||||
err_after: int|None = None,
|
||||
err_after: float|None = None,
|
||||
rng_seed: int = 50,
|
||||
):
|
||||
# contact target actor, do a stream dialog.
|
||||
async with (
|
||||
|
@ -989,14 +977,18 @@ async def tell_little_bro(
|
|||
basic_echo_server,
|
||||
|
||||
# XXX proxy any delayed err condition
|
||||
err_after=err_after,
|
||||
err_after_imsg=(
|
||||
err_after * rng_seed
|
||||
if err_after is not None
|
||||
else None
|
||||
),
|
||||
) as (sub_ctx, first),
|
||||
|
||||
sub_ctx.open_stream() as echo_ipc,
|
||||
):
|
||||
actor: Actor = current_actor()
|
||||
uid: tuple = actor.uid
|
||||
for i in range(100):
|
||||
for i in range(rng_seed):
|
||||
msg: tuple = (
|
||||
uid,
|
||||
i,
|
||||
|
@ -1021,13 +1013,13 @@ async def tell_little_bro(
|
|||
)
|
||||
@pytest.mark.parametrize(
|
||||
'raise_sub_spawn_error_after',
|
||||
[None, 50],
|
||||
[None, 0.5],
|
||||
)
|
||||
def test_peer_spawns_and_cancels_service_subactor(
|
||||
debug_mode: bool,
|
||||
raise_client_error: str,
|
||||
reg_addr: tuple[str, int],
|
||||
raise_sub_spawn_error_after: int|None,
|
||||
raise_sub_spawn_error_after: float|None,
|
||||
):
|
||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||
# discovered as part of implementing workspace ctx
|
||||
|
@ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
# and the server's spawned child should cancel and terminate!
|
||||
peer_name: str = 'little_bro'
|
||||
|
||||
|
||||
def check_inner_rte(rae: RemoteActorError):
|
||||
'''
|
||||
Validate the little_bro's relayed inception!
|
||||
|
@ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
)
|
||||
|
||||
try:
|
||||
res = await client_ctx.result(hide_tb=False)
|
||||
|
||||
res = await client_ctx.wait_for_result(hide_tb=False)
|
||||
# in remote (relayed inception) error
|
||||
# case, we should error on the line above!
|
||||
if raise_sub_spawn_error_after:
|
||||
|
@ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
assert isinstance(res, ContextCancelled)
|
||||
assert client_ctx.cancel_acked
|
||||
assert res.canceller == root.uid
|
||||
assert not raise_sub_spawn_error_after
|
||||
|
||||
# cancelling the spawner sub should
|
||||
# transitively cancel it's sub, the little
|
||||
# bruv.
|
||||
print('root cancelling server/client sub-actors')
|
||||
await spawn_ctx.cancel()
|
||||
async with tractor.find_actor(
|
||||
name=peer_name,
|
||||
) as sub:
|
||||
assert not sub
|
||||
|
||||
# XXX, only for tracing
|
||||
# except BaseException as _berr:
|
||||
# berr = _berr
|
||||
# await tractor.pause(shield=True)
|
||||
# raise berr
|
||||
|
||||
except RemoteActorError as rae:
|
||||
_err = rae
|
||||
|
@ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
raise
|
||||
# await tractor.pause()
|
||||
|
||||
else:
|
||||
assert not raise_sub_spawn_error_after
|
||||
|
||||
# cancelling the spawner sub should
|
||||
# transitively cancel it's sub, the little
|
||||
# bruv.
|
||||
print('root cancelling server/client sub-actors')
|
||||
await spawn_ctx.cancel()
|
||||
async with tractor.find_actor(
|
||||
name=peer_name,
|
||||
) as sub:
|
||||
assert not sub
|
||||
|
||||
# await tractor.pause()
|
||||
# await server.cancel_actor()
|
||||
|
||||
except RemoteActorError as rae:
|
||||
|
@ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
|
||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||
# called directly fot this confext.
|
||||
# called directly for this confext.
|
||||
except ContextCancelled as ctxc:
|
||||
_ctxc = ctxc
|
||||
print(
|
||||
|
@ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
|
||||
# assert spawn_ctx.cancelled_caught
|
||||
|
||||
async def _main():
|
||||
with trio.fail_after(
|
||||
3 if not debug_mode
|
||||
else 999
|
||||
):
|
||||
await main()
|
||||
|
||||
if raise_sub_spawn_error_after:
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
trio.run(_main)
|
||||
|
||||
rae: RemoteActorError = excinfo.value
|
||||
check_inner_rte(rae)
|
||||
|
||||
else:
|
||||
trio.run(main)
|
||||
trio.run(_main)
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
'''
|
||||
Define the details of inter-actor "out-of-band" (OoB) cancel
|
||||
semantics, that is how cancellation works when a cancel request comes
|
||||
from the different concurrency (primitive's) "layer" then where the
|
||||
eventual `trio.Task` actually raises a signal.
|
||||
|
||||
'''
|
||||
from functools import partial
|
||||
# from contextlib import asynccontextmanager as acm
|
||||
# import itertools
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import ( # typing
|
||||
ActorNursery,
|
||||
Portal,
|
||||
Context,
|
||||
# ContextCancelled,
|
||||
# RemoteActorError,
|
||||
)
|
||||
# from tractor._testing import (
|
||||
# tractor_test,
|
||||
# expect_ctxc,
|
||||
# )
|
||||
|
||||
# XXX TODO cases:
|
||||
# - [ ] peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
# 2 cases:
|
||||
# - calls `Actor.cancel()` locally in some task
|
||||
# - calls LocalPortal.cancel_actor()` ?
|
||||
#
|
||||
# things to ensure!
|
||||
# -[ ] the ctxc raised in a child should ideally show the tb of the
|
||||
# underlying `Cancelled` checkpoint, i.e.
|
||||
# `raise scope_error from ctxc`?
|
||||
#
|
||||
# -[ ] a self-cancelled context, if not allowed to block on
|
||||
# `ctx.result()` at some point will hang since the `ctx._scope`
|
||||
# is never `.cancel_called`; cases for this include,
|
||||
# - an `open_ctx()` which never starteds before being OoB actor
|
||||
# cancelled.
|
||||
# |_ parent task will be blocked in `.open_context()` for the
|
||||
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
|
||||
# will never have been signalled..
|
||||
|
||||
# '''
|
||||
# ...
|
||||
|
||||
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
|
||||
# but with the `Lock.acquire()` from a `@context` to ensure the
|
||||
# implicit ignore-case-non-unmasking.
|
||||
#
|
||||
# @tractor.context
|
||||
# async def acquire_actor_global_lock(
|
||||
# ctx: tractor.Context,
|
||||
# ignore_special_cases: bool,
|
||||
# ):
|
||||
|
||||
# async with maybe_unmask_excs(
|
||||
# ignore_special_cases=ignore_special_cases,
|
||||
# ):
|
||||
# await ctx.started('locked')
|
||||
|
||||
# # block til cancelled
|
||||
# await trio.sleep_forever()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_forever(
|
||||
ctx: tractor.Context,
|
||||
# ignore_special_cases: bool,
|
||||
do_started: bool,
|
||||
):
|
||||
|
||||
# async with maybe_unmask_excs(
|
||||
# ignore_special_cases=ignore_special_cases,
|
||||
# ):
|
||||
# await ctx.started('locked')
|
||||
if do_started:
|
||||
await ctx.started()
|
||||
|
||||
# block til cancelled
|
||||
print('sleepin on child-side..')
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'cancel_ctx',
|
||||
[True, False],
|
||||
)
|
||||
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
|
||||
debug_mode: bool,
|
||||
loglevel: str,
|
||||
cancel_ctx: bool,
|
||||
):
|
||||
'''
|
||||
The most "basic" out-of-band-task self-cancellation case where
|
||||
`Portal.open_context()` is entered in a bg task and the
|
||||
parent-task (of the containing nursery) calls `Context.cancel()`
|
||||
without the child knowing; the `Context._scope` should be
|
||||
`.cancel_called` when the IPC ctx's child-side relays
|
||||
a `ContextCancelled` with a `.canceller` set to the parent
|
||||
actor('s task).
|
||||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(
|
||||
2 if not debug_mode else 999,
|
||||
):
|
||||
an: ActorNursery
|
||||
async with (
|
||||
tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
loglevel='devx',
|
||||
enable_stack_on_sig=True,
|
||||
) as an,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
ptl: Portal = await an.start_actor(
|
||||
'sub',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async def _open_ctx_async(
|
||||
do_started: bool = True,
|
||||
task_status=trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
# do we expect to never enter the
|
||||
# `.open_context()` below.
|
||||
if not do_started:
|
||||
task_status.started()
|
||||
|
||||
async with ptl.open_context(
|
||||
sleep_forever,
|
||||
do_started=do_started,
|
||||
) as (ctx, first):
|
||||
task_status.started(ctx)
|
||||
await trio.sleep_forever()
|
||||
|
||||
# XXX, this is the key OoB part!
|
||||
#
|
||||
# - start the `.open_context()` in a bg task which
|
||||
# blocks inside the embedded scope-body,
|
||||
#
|
||||
# - when we call `Context.cancel()` it **is
|
||||
# not** from the same task which eventually runs
|
||||
# `.__aexit__()`,
|
||||
#
|
||||
# - since the bg "opener" task will be in
|
||||
# a `trio.sleep_forever()`, it must be interrupted
|
||||
# by the `ContextCancelled` delivered from the
|
||||
# child-side; `Context._scope: CancelScope` MUST
|
||||
# be `.cancel_called`!
|
||||
#
|
||||
print('ASYNC opening IPC context in subtask..')
|
||||
maybe_ctx: Context|None = await tn.start(partial(
|
||||
_open_ctx_async,
|
||||
))
|
||||
|
||||
if (
|
||||
maybe_ctx
|
||||
and
|
||||
cancel_ctx
|
||||
):
|
||||
print('cancelling first IPC ctx!')
|
||||
await maybe_ctx.cancel()
|
||||
|
||||
# XXX, note that despite `maybe_context.cancel()`
|
||||
# being called above, it's the parent (bg) task
|
||||
# which was originally never interrupted in
|
||||
# the `ctx._scope` body due to missing case logic in
|
||||
# `ctx._maybe_cancel_and_set_remote_error()`.
|
||||
#
|
||||
# It didn't matter that the subactor process was
|
||||
# already terminated and reaped, nothing was
|
||||
# cancelling the ctx-parent task's scope!
|
||||
#
|
||||
print('cancelling subactor!')
|
||||
await ptl.cancel_actor()
|
||||
|
||||
if maybe_ctx:
|
||||
try:
|
||||
await maybe_ctx.wait_for_result()
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
assert not cancel_ctx
|
||||
assert (
|
||||
ctxc.canceller
|
||||
==
|
||||
tractor.current_actor().aid.uid
|
||||
)
|
||||
# don't re-raise since it'll trigger
|
||||
# an EG from the above tn.
|
||||
|
||||
if cancel_ctx:
|
||||
# graceful self-cancel
|
||||
trio.run(main)
|
||||
|
||||
else:
|
||||
# ctx parent task should see OoB ctxc due to
|
||||
# `ptl.cancel_actor()`.
|
||||
with pytest.raises(tractor.ContextCancelled) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
assert 'root' in excinfo.value.canceller[0]
|
||||
|
||||
|
||||
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
|
||||
# debug_mode: bool,
|
||||
# loglevel: str,
|
||||
# ):
|
||||
# '''
|
||||
# Demos OoB cancellation from the perspective of a ctx opened with
|
||||
# a child subactor where the parent cancels the child at the "actor
|
||||
# layer" using `Portal.cancel_actor()` and thus the
|
||||
# `ContextCancelled.canceller` received by the ctx's parent-side
|
||||
# task will appear to be a "self cancellation" even though that
|
||||
# specific task itself was not cancelled and thus
|
||||
# `Context.cancel_called ==False`.
|
||||
# '''
|
||||
# TODO, do we have an existing implied ctx
|
||||
# cancel test like this?
|
||||
# with trio.move_on_after(0.5):# as cs:
|
||||
# await _open_ctx_async(
|
||||
# do_started=False,
|
||||
# )
|
||||
|
||||
|
||||
# in-line ctx scope should definitely raise
|
||||
# a ctxc with `.canceller = 'root'`
|
||||
# async with ptl.open_context(
|
||||
# sleep_forever,
|
||||
# do_started=True,
|
||||
# ) as pair:
|
||||
|
|
@ -6,11 +6,18 @@ want to see changed.
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from types import ModuleType
|
||||
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
from _pytest import pathlib
|
||||
from tractor.trionics import collapse_eg
|
||||
import trio
|
||||
from trio import TaskStatus
|
||||
from tractor._testing import (
|
||||
examples_dir,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
|
||||
`.__context__` field when a user (by accident) re-raises from a `finally:`.
|
||||
Demo how a masking `trio.Cancelled` could be handled by unmasking
|
||||
from the `.__context__` field when a user (by accident) re-raises
|
||||
from a `finally:`.
|
||||
|
||||
'''
|
||||
import tractor
|
||||
|
@ -117,11 +125,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
tractor.trionics.maybe_raise_from_masking_exc(
|
||||
tn=tn,
|
||||
unmask_from=(
|
||||
trio.Cancelled
|
||||
if unmask_from_canc
|
||||
else None
|
||||
(trio.Cancelled,) if unmask_from_canc
|
||||
else ()
|
||||
),
|
||||
)
|
||||
):
|
||||
|
@ -136,8 +142,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
with tractor.devx.maybe_open_crash_handler(
|
||||
pdb=debug_mode,
|
||||
) as bxerr:
|
||||
if bxerr:
|
||||
assert not bxerr.value
|
||||
assert not bxerr.value
|
||||
|
||||
async with (
|
||||
wraps_tn_that_always_cancels() as tn,
|
||||
|
@ -145,11 +150,12 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
assert not tn.cancel_scope.cancel_called
|
||||
assert 0
|
||||
|
||||
assert (
|
||||
(err := bxerr.value)
|
||||
and
|
||||
type(err) is AssertionError
|
||||
)
|
||||
if debug_mode:
|
||||
assert (
|
||||
(err := bxerr.value)
|
||||
and
|
||||
type(err) is AssertionError
|
||||
)
|
||||
|
||||
with pytest.raises(ExceptionGroup) as excinfo:
|
||||
trio.run(_main)
|
||||
|
@ -160,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
assert len(assert_eg.exceptions) == 1
|
||||
|
||||
|
||||
|
||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
||||
will break a strict-eg-tn's multi-cancelled absorption..
|
||||
Demo how a using an `async with sndchan` inside
|
||||
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
|
||||
multi-cancelled absorption..
|
||||
|
||||
'''
|
||||
from tractor import (
|
||||
|
@ -192,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
|||
f'Closed {task!r}\n'
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
# XXX should ensure ONLY the KBI
|
||||
|
@ -213,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
|||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'raise_unmasked', [
|
||||
True,
|
||||
pytest.param(
|
||||
False,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="see examples/trio/send_chan_aclose_masks.py"
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'child_errors_mid_stream',
|
||||
[True, False],
|
||||
)
|
||||
def test_unmask_aclose_as_checkpoint_on_aexit(
|
||||
raise_unmasked: bool,
|
||||
child_errors_mid_stream: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that our unmasker util works over the common case where
|
||||
a mem-chan's `.aclose()` is included in an `@acm` stack
|
||||
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
|
||||
exception from user code resulting in a silent failure which
|
||||
appears like graceful cancellation.
|
||||
|
||||
This test suite is mostly implemented as an example script so it
|
||||
could more easily be shared with `trio`-core peeps as `tractor`-less
|
||||
minimum reproducing example.
|
||||
|
||||
'''
|
||||
mod: ModuleType = pathlib.import_path(
|
||||
examples_dir()
|
||||
/ 'trio'
|
||||
/ 'send_chan_aclose_masks_beg.py',
|
||||
root=examples_dir(),
|
||||
consider_namespace_packages=False,
|
||||
)
|
||||
with pytest.raises(RuntimeError):
|
||||
trio.run(partial(
|
||||
mod.main,
|
||||
raise_unmasked=raise_unmasked,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
))
|
||||
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'ignore_special_cases', [
|
||||
True,
|
||||
pytest.param(
|
||||
False,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="see examples/trio/lockacquire_not_umasked.py"
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
|
||||
ignore_special_cases: bool,
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
mod: ModuleType = pathlib.import_path(
|
||||
examples_dir()
|
||||
/ 'trio'
|
||||
/ 'lockacquire_not_unmasked.py',
|
||||
root=examples_dir(),
|
||||
consider_namespace_packages=False,
|
||||
)
|
||||
async def _main():
|
||||
with trio.fail_after(2):
|
||||
await mod.main(
|
||||
ignore_special_cases=ignore_special_cases,
|
||||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
)
|
||||
|
||||
trio.run(_main)
|
||||
|
|
|
@ -442,25 +442,25 @@ class Context:
|
|||
'''
|
||||
Records whether cancellation has been requested for this context
|
||||
by a call to `.cancel()` either due to,
|
||||
- either an explicit call by some local task,
|
||||
- an explicit call by some local task,
|
||||
- or an implicit call due to an error caught inside
|
||||
the ``Portal.open_context()`` block.
|
||||
the `Portal.open_context()` block.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
@cancel_called.setter
|
||||
def cancel_called(self, val: bool) -> None:
|
||||
'''
|
||||
Set the self-cancelled request `bool` value.
|
||||
# XXX, to debug who frickin sets it..
|
||||
# @cancel_called.setter
|
||||
# def cancel_called(self, val: bool) -> None:
|
||||
# '''
|
||||
# Set the self-cancelled request `bool` value.
|
||||
|
||||
'''
|
||||
# to debug who frickin sets it..
|
||||
# if val:
|
||||
# from .devx import pause_from_sync
|
||||
# pause_from_sync()
|
||||
# '''
|
||||
# if val:
|
||||
# from .devx import pause_from_sync
|
||||
# pause_from_sync()
|
||||
|
||||
self._cancel_called = val
|
||||
# self._cancel_called = val
|
||||
|
||||
@property
|
||||
def canceller(self) -> tuple[str, str]|None:
|
||||
|
@ -635,6 +635,71 @@ class Context:
|
|||
'''
|
||||
await self.chan.send(Stop(cid=self.cid))
|
||||
|
||||
@property
|
||||
def parent_task(self) -> trio.Task:
|
||||
'''
|
||||
This IPC context's "owning task" which is a `trio.Task`
|
||||
on one of the "sides" of the IPC.
|
||||
|
||||
Note that the "parent_" prefix here refers to the local
|
||||
`trio` task tree using the same interface as
|
||||
`trio.Nursery.parent_task` whereas for IPC contexts,
|
||||
a different cross-actor task hierarchy exists:
|
||||
|
||||
- a "parent"-side which originally entered
|
||||
`Portal.open_context()`,
|
||||
|
||||
- the "child"-side which was spawned and scheduled to invoke
|
||||
a function decorated with `@tractor.context`.
|
||||
|
||||
This task is thus a handle to mem-domain-distinct/per-process
|
||||
`Nursery.parent_task` depending on in which of the above
|
||||
"sides" this context exists.
|
||||
|
||||
'''
|
||||
return self._task
|
||||
|
||||
def _is_blocked_on_rx_chan(self) -> bool:
|
||||
'''
|
||||
Predicate to indicate whether the owner `._task: trio.Task` is
|
||||
currently blocked (by `.receive()`-ing) on its underlying RPC
|
||||
feeder `._rx_chan`.
|
||||
|
||||
This knowledge is highly useful when handling so called
|
||||
"out-of-band" (OoB) cancellation conditions where a peer
|
||||
actor's task transmitted some remote error/cancel-msg and we
|
||||
must know whether to signal-via-cancel currently executing
|
||||
"user-code" (user defined code embedded in `ctx._scope`) or
|
||||
simply to forward the IPC-msg-as-error **without calling**
|
||||
`._scope.cancel()`.
|
||||
|
||||
In the latter case it is presumed that if the owner task is
|
||||
blocking for the next IPC msg, it will eventually receive,
|
||||
process and raise the equivalent local error **without**
|
||||
requiring `._scope.cancel()` to be explicitly called by the
|
||||
*delivering OoB RPC-task* (via `_deliver_msg()`).
|
||||
|
||||
'''
|
||||
# NOTE, see the mem-chan meth-impls for *why* this
|
||||
# logic works,
|
||||
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
|
||||
#
|
||||
# XXX realize that this is NOT an
|
||||
# official/will-be-loudly-deprecated API:
|
||||
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
|
||||
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
|
||||
#
|
||||
# orig repo intro in the mem-chan change over patch:
|
||||
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
|
||||
# |_https://github.com/python-trio/trio/pull/616
|
||||
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
|
||||
#
|
||||
return (
|
||||
self._task.custom_sleep_data
|
||||
is
|
||||
self._rx_chan
|
||||
)
|
||||
|
||||
def _maybe_cancel_and_set_remote_error(
|
||||
self,
|
||||
error: BaseException,
|
||||
|
@ -787,13 +852,27 @@ class Context:
|
|||
if self._canceller is None:
|
||||
log.error('Ctx has no canceller set!?')
|
||||
|
||||
cs: trio.CancelScope = self._scope
|
||||
|
||||
# ?TODO? see comment @ .start_remote_task()`
|
||||
#
|
||||
# if not cs:
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
# raise RuntimeError(
|
||||
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
|
||||
# f'{self}\n'
|
||||
# f'\n'
|
||||
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
|
||||
# )
|
||||
|
||||
# Cancel the local `._scope`, catch that
|
||||
# `._scope.cancelled_caught` and re-raise any remote error
|
||||
# once exiting (or manually calling `.wait_for_result()`) the
|
||||
# `.open_context()` block.
|
||||
cs: trio.CancelScope = self._scope
|
||||
if (
|
||||
cs
|
||||
and not cs.cancel_called
|
||||
|
||||
# XXX this is an expected cancel request response
|
||||
# message and we **don't need to raise it** in the
|
||||
|
@ -802,8 +881,7 @@ class Context:
|
|||
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
||||
# always should be set.
|
||||
and not self._is_self_cancelled()
|
||||
and not cs.cancel_called
|
||||
and not cs.cancelled_caught
|
||||
# and not cs.cancelled_caught
|
||||
):
|
||||
if (
|
||||
msgerr
|
||||
|
@ -814,7 +892,7 @@ class Context:
|
|||
not self._cancel_on_msgerr
|
||||
):
|
||||
message: str = (
|
||||
'NOT Cancelling `Context._scope` since,\n'
|
||||
f'NOT Cancelling `Context._scope` since,\n'
|
||||
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||
f'AND we got a msg-type-error!\n'
|
||||
f'{error}\n'
|
||||
|
@ -824,13 +902,43 @@ class Context:
|
|||
# `trio.Cancelled` subtype here ;)
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||
# from .devx import pause_from_sync
|
||||
# pause_from_sync()
|
||||
self._scope.cancel()
|
||||
else:
|
||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||
cs.cancel()
|
||||
|
||||
# TODO, explicit condition for OoB (self-)cancellation?
|
||||
# - we called `Portal.cancel_actor()` from this actor
|
||||
# and the peer ctx task delivered ctxc due to it.
|
||||
# - currently `self._is_self_cancelled()` will be true
|
||||
# since the ctxc.canceller check will match us even though it
|
||||
# wasn't from this ctx specifically!
|
||||
elif (
|
||||
cs
|
||||
and self._is_self_cancelled()
|
||||
and not cs.cancel_called
|
||||
):
|
||||
message: str = (
|
||||
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
|
||||
'\n'
|
||||
)
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
# TODO XXX, required to fix timeout failure in
|
||||
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
|
||||
#
|
||||
|
||||
# XXX NOTE XXX, this is SUPER SUBTLE!
|
||||
# we only want to cancel our embedded `._scope`
|
||||
# if the ctx's current/using task is NOT blocked
|
||||
# on `._rx_chan.receive()` and on some other
|
||||
# `trio`-checkpoint since in the former case
|
||||
# any `._remote_error` will be relayed through
|
||||
# the rx-chan and appropriately raised by the owning
|
||||
# `._task` directly. IF the owner task is however
|
||||
# blocking elsewhere we need to interrupt it **now**.
|
||||
if not self._is_blocked_on_rx_chan():
|
||||
cs.cancel()
|
||||
else:
|
||||
# rx_stats = self._rx_chan.statistics()
|
||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||
|
||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
||||
if (
|
||||
|
@ -854,6 +962,7 @@ class Context:
|
|||
+
|
||||
cs_fmt
|
||||
)
|
||||
|
||||
log.cancel(
|
||||
message
|
||||
+
|
||||
|
@ -946,8 +1055,9 @@ class Context:
|
|||
|
||||
'''
|
||||
side: str = self.side
|
||||
# XXX for debug via the `@.setter`
|
||||
self.cancel_called = True
|
||||
self._cancel_called = True
|
||||
# ^ XXX for debug via the `@.setter`
|
||||
# self.cancel_called = True
|
||||
|
||||
header: str = (
|
||||
f'Cancelling ctx from {side!r}-side\n'
|
||||
|
@ -2011,6 +2121,9 @@ async def open_context_from_portal(
|
|||
f'|_{portal.actor}\n'
|
||||
)
|
||||
|
||||
# ?TODO? could we move this to inside the `tn` block?
|
||||
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
|
||||
# -> would allow a `if not ._scope: => raise RTE` ?
|
||||
ctx: Context = await portal.actor.start_remote_task(
|
||||
portal.channel,
|
||||
nsf=nsf,
|
||||
|
@ -2037,6 +2150,7 @@ async def open_context_from_portal(
|
|||
scope_err: BaseException|None = None
|
||||
ctxc_from_child: ContextCancelled|None = None
|
||||
try:
|
||||
# from .devx import pause
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
|
@ -2059,6 +2173,10 @@ async def open_context_from_portal(
|
|||
# the dialog, the `Error` msg should be raised from the `msg`
|
||||
# handling block below.
|
||||
try:
|
||||
log.runtime(
|
||||
f'IPC ctx parent waiting on Started msg..\n'
|
||||
f'ctx.cid: {ctx.cid!r}\n'
|
||||
)
|
||||
started_msg, first = await ctx._pld_rx.recv_msg(
|
||||
ipc=ctx,
|
||||
expect_msg=Started,
|
||||
|
@ -2067,16 +2185,16 @@ async def open_context_from_portal(
|
|||
)
|
||||
except trio.Cancelled as taskc:
|
||||
ctx_cs: trio.CancelScope = ctx._scope
|
||||
log.cancel(
|
||||
f'IPC ctx was cancelled during "child" task sync due to\n\n'
|
||||
f'.cid: {ctx.cid!r}\n'
|
||||
f'.maybe_error: {ctx.maybe_error!r}\n'
|
||||
)
|
||||
# await pause(shield=True)
|
||||
|
||||
if not ctx_cs.cancel_called:
|
||||
raise
|
||||
|
||||
# from .devx import pause
|
||||
# await pause(shield=True)
|
||||
|
||||
log.cancel(
|
||||
'IPC ctx was cancelled during "child" task sync due to\n\n'
|
||||
f'{ctx.maybe_error}\n'
|
||||
)
|
||||
# OW if the ctx's scope was cancelled manually,
|
||||
# likely the `Context` was cancelled via a call to
|
||||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||
|
@ -2199,7 +2317,7 @@ async def open_context_from_portal(
|
|||
# documenting it as a definittive example of
|
||||
# debugging the tractor-runtime itself using it's
|
||||
# own `.devx.` tooling!
|
||||
#
|
||||
#
|
||||
# await debug.pause()
|
||||
|
||||
# CASE 2: context was cancelled by local task calling
|
||||
|
@ -2272,13 +2390,16 @@ async def open_context_from_portal(
|
|||
match scope_err:
|
||||
case trio.Cancelled():
|
||||
logmeth = log.cancel
|
||||
cause: str = 'cancelled'
|
||||
|
||||
# XXX explicitly report on any non-graceful-taskc cases
|
||||
case _:
|
||||
cause: str = 'errored'
|
||||
logmeth = log.exception
|
||||
|
||||
logmeth(
|
||||
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
|
||||
f'ctx {ctx.side!r}-side {cause!r} with,\n'
|
||||
f'{ctx.repr_outcome()!r}\n'
|
||||
)
|
||||
|
||||
if debug_mode():
|
||||
|
@ -2303,6 +2424,7 @@ async def open_context_from_portal(
|
|||
# told us it's cancelled ;p
|
||||
if ctxc_from_child is None:
|
||||
try:
|
||||
# await pause(shield=True)
|
||||
await ctx.cancel()
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
|
@ -2459,8 +2581,10 @@ async def open_context_from_portal(
|
|||
log.cancel(
|
||||
f'Context cancelled by local {ctx.side!r}-side task\n'
|
||||
f'c)>\n'
|
||||
f' |_{ctx._task}\n\n'
|
||||
f'{repr(scope_err)}\n'
|
||||
f' |_{ctx.parent_task}\n'
|
||||
f' .cid={ctx.cid!r}\n'
|
||||
f'\n'
|
||||
f'{scope_err!r}\n'
|
||||
)
|
||||
|
||||
# TODO: should we add a `._cancel_req_received`
|
||||
|
|
|
@ -654,8 +654,7 @@ async def _invoke(
|
|||
# scope ensures unasking of the `await coro` below
|
||||
# *should* never be interfered with!!
|
||||
maybe_raise_from_masking_exc(
|
||||
tn=tn,
|
||||
unmask_from=Cancelled,
|
||||
unmask_from=(Cancelled,),
|
||||
) as _mbme, # maybe boxed masked exc
|
||||
):
|
||||
ctx._scope_nursery = tn
|
||||
|
|
|
@ -446,12 +446,12 @@ class ActorNursery:
|
|||
@acm
|
||||
async def _open_and_supervise_one_cancels_all_nursery(
|
||||
actor: Actor,
|
||||
tb_hide: bool = False,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
|
||||
# normally don't need to show user by default
|
||||
__tracebackhide__: bool = tb_hide
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
outer_err: BaseException|None = None
|
||||
inner_err: BaseException|None = None
|
||||
|
|
|
@ -613,10 +613,9 @@ async def drain_to_final_msg(
|
|||
# msg: dict = await ctx._rx_chan.receive()
|
||||
# if res_cs.cancelled_caught:
|
||||
#
|
||||
# -[ ] make sure pause points work here for REPLing
|
||||
# -[x] make sure pause points work here for REPLing
|
||||
# the runtime itself; i.e. ensure there's no hangs!
|
||||
# |_from tractor.devx.debug import pause
|
||||
# await pause()
|
||||
# |_see masked code below in .cancel_called path
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
|
@ -652,6 +651,10 @@ async def drain_to_final_msg(
|
|||
f'IPC ctx cancelled externally during result drain ?\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
# XXX, for tracing `Cancelled`..
|
||||
# from tractor.devx.debug import pause
|
||||
# await pause(shield=True)
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
|
|
|
@ -30,7 +30,6 @@ from typing import (
|
|||
AsyncIterator,
|
||||
Callable,
|
||||
Hashable,
|
||||
Optional,
|
||||
Sequence,
|
||||
TypeVar,
|
||||
)
|
||||
|
@ -176,7 +175,7 @@ class _Cache:
|
|||
a kept-alive-while-in-use async resource.
|
||||
|
||||
'''
|
||||
service_tn: Optional[trio.Nursery] = None
|
||||
service_tn: trio.Nursery|None = None
|
||||
locks: dict[Hashable, trio.Lock] = {}
|
||||
users: int = 0
|
||||
values: dict[Any, Any] = {}
|
||||
|
@ -185,7 +184,7 @@ class _Cache:
|
|||
tuple[trio.Nursery, trio.Event]
|
||||
] = {}
|
||||
# nurseries: dict[int, trio.Nursery] = {}
|
||||
no_more_users: Optional[trio.Event] = None
|
||||
no_more_users: trio.Event|None = None
|
||||
|
||||
@classmethod
|
||||
async def run_ctx(
|
||||
|
@ -195,16 +194,18 @@ class _Cache:
|
|||
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
try:
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
# discard nursery ref so it won't be re-used (an error)?
|
||||
value = cls.values.pop(ctx_key)
|
||||
cls.resources.pop(ctx_key)
|
||||
try:
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
try:
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
value = cls.values.pop(ctx_key)
|
||||
finally:
|
||||
# discard nursery ref so it won't be re-used (an error)?
|
||||
cls.resources.pop(ctx_key)
|
||||
|
||||
|
||||
@acm
|
||||
|
|
|
@ -22,7 +22,14 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from typing import TYPE_CHECKING
|
||||
import inspect
|
||||
from types import (
|
||||
TracebackType,
|
||||
)
|
||||
from typing import (
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
from tractor.log import get_logger
|
||||
|
@ -60,12 +67,71 @@ def find_masked_excs(
|
|||
return None
|
||||
|
||||
|
||||
_mask_cases: dict[
|
||||
Type[Exception], # masked exc type
|
||||
dict[
|
||||
int, # inner-frame index into `inspect.getinnerframes()`
|
||||
# `FrameInfo.function/filename: str`s to match
|
||||
dict[str, str],
|
||||
],
|
||||
] = {
|
||||
trio.WouldBlock: {
|
||||
# `trio.Lock.acquire()` has a checkpoint inside the
|
||||
# `WouldBlock`-no_wait path's handler..
|
||||
-5: { # "5th frame up" from checkpoint
|
||||
'filename': 'trio/_sync.py',
|
||||
'function': 'acquire',
|
||||
# 'lineno': 605, # matters?
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def is_expected_masking_case(
|
||||
cases: dict,
|
||||
exc_ctx: Exception,
|
||||
exc_match: BaseException,
|
||||
|
||||
) -> bool|inspect.FrameInfo:
|
||||
'''
|
||||
Determine whether the provided masked exception is from a known
|
||||
bug/special/unintentional-`trio`-impl case which we do not wish
|
||||
to unmask.
|
||||
|
||||
Return any guilty `inspect.FrameInfo` ow `False`.
|
||||
|
||||
'''
|
||||
exc_tb: TracebackType = exc_match.__traceback__
|
||||
if cases := _mask_cases.get(type(exc_ctx)):
|
||||
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
|
||||
|
||||
# from tractor.devx.debug import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
for iframe, matchon in cases.items():
|
||||
try:
|
||||
masker_frame: inspect.FrameInfo = inner[iframe]
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
for field, in_field in matchon.items():
|
||||
val = getattr(
|
||||
masker_frame,
|
||||
field,
|
||||
)
|
||||
if in_field not in val:
|
||||
break
|
||||
else:
|
||||
return masker_frame
|
||||
|
||||
return False
|
||||
|
||||
|
||||
|
||||
# XXX, relevant discussion @ `trio`-core,
|
||||
# https://github.com/python-trio/trio/issues/455
|
||||
#
|
||||
@acm
|
||||
async def maybe_raise_from_masking_exc(
|
||||
tn: trio.Nursery|None = None,
|
||||
unmask_from: (
|
||||
BaseException|
|
||||
tuple[BaseException]
|
||||
|
@ -74,18 +140,30 @@ async def maybe_raise_from_masking_exc(
|
|||
raise_unmasked: bool = True,
|
||||
extra_note: str = (
|
||||
'This can occurr when,\n'
|
||||
' - a `trio.Nursery` scope embeds a `finally:`-block '
|
||||
'which executes a checkpoint!'
|
||||
'\n'
|
||||
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
|
||||
'which execs an un-shielded checkpoint!'
|
||||
#
|
||||
# ^TODO? other cases?
|
||||
),
|
||||
|
||||
always_warn_on: tuple[BaseException] = (
|
||||
always_warn_on: tuple[Type[BaseException]] = (
|
||||
trio.Cancelled,
|
||||
),
|
||||
|
||||
# don't ever unmask or warn on any masking pair,
|
||||
# {<masked-excT-key> -> <masking-excT-value>}
|
||||
never_warn_on: dict[
|
||||
Type[BaseException],
|
||||
Type[BaseException],
|
||||
] = {
|
||||
KeyboardInterrupt: trio.Cancelled,
|
||||
trio.Cancelled: trio.Cancelled,
|
||||
},
|
||||
# ^XXX, special case(s) where we warn-log bc likely
|
||||
# there will be no operational diff since the exc
|
||||
# is always expected to be consumed.
|
||||
|
||||
) -> BoxedMaybeException:
|
||||
'''
|
||||
Maybe un-mask and re-raise exception(s) suppressed by a known
|
||||
|
@ -104,81 +182,112 @@ async def maybe_raise_from_masking_exc(
|
|||
individual sub-excs but maintain the eg-parent's form right?
|
||||
|
||||
'''
|
||||
if not isinstance(unmask_from, tuple):
|
||||
raise ValueError(
|
||||
f'Invalid unmask_from = {unmask_from!r}\n'
|
||||
f'Must be a `tuple[Type[BaseException]]`.\n'
|
||||
)
|
||||
|
||||
from tractor.devx.debug import (
|
||||
BoxedMaybeException,
|
||||
pause,
|
||||
)
|
||||
boxed_maybe_exc = BoxedMaybeException(
|
||||
raise_on_exit=raise_unmasked,
|
||||
)
|
||||
matching: list[BaseException]|None = None
|
||||
maybe_eg: ExceptionGroup|None
|
||||
|
||||
if tn:
|
||||
try: # handle egs
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except* unmask_from as _maybe_eg:
|
||||
maybe_eg = _maybe_eg
|
||||
try:
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except BaseException as _bexc:
|
||||
bexc = _bexc
|
||||
if isinstance(bexc, BaseExceptionGroup):
|
||||
matches: ExceptionGroup
|
||||
matches, _ = maybe_eg.split(
|
||||
unmask_from
|
||||
)
|
||||
if not matches:
|
||||
raise
|
||||
matches, _ = bexc.split(unmask_from)
|
||||
if matches:
|
||||
matching = matches.exceptions
|
||||
|
||||
matching: list[BaseException] = matches.exceptions
|
||||
else:
|
||||
try: # handle non-egs
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except unmask_from as _maybe_exc:
|
||||
maybe_exc = _maybe_exc
|
||||
matching: list[BaseException] = [
|
||||
maybe_exc
|
||||
]
|
||||
|
||||
# XXX, only unmask-ed for debuggin!
|
||||
# TODO, remove eventually..
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
await pause(shield=True)
|
||||
raise berr
|
||||
elif (
|
||||
unmask_from
|
||||
and
|
||||
type(bexc) in unmask_from
|
||||
):
|
||||
matching = [bexc]
|
||||
|
||||
if matching is None:
|
||||
raise
|
||||
|
||||
masked: list[tuple[BaseException, BaseException]] = []
|
||||
for exc_match in matching:
|
||||
|
||||
if exc_ctx := find_masked_excs(
|
||||
maybe_masker=exc_match,
|
||||
unmask_from={unmask_from},
|
||||
unmask_from=set(unmask_from),
|
||||
):
|
||||
masked.append((exc_ctx, exc_match))
|
||||
masked.append((
|
||||
exc_ctx,
|
||||
exc_match,
|
||||
))
|
||||
boxed_maybe_exc.value = exc_match
|
||||
note: str = (
|
||||
f'\n'
|
||||
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
|
||||
f'^^WARNING^^\n'
|
||||
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
|
||||
)
|
||||
if extra_note:
|
||||
note += (
|
||||
f'\n'
|
||||
f'{extra_note}\n'
|
||||
)
|
||||
exc_ctx.add_note(note)
|
||||
|
||||
if type(exc_match) in always_warn_on:
|
||||
do_warn: bool = (
|
||||
never_warn_on.get(
|
||||
type(exc_ctx) # masking type
|
||||
)
|
||||
is not
|
||||
type(exc_match) # masked type
|
||||
)
|
||||
|
||||
if do_warn:
|
||||
exc_ctx.add_note(note)
|
||||
|
||||
if (
|
||||
do_warn
|
||||
and
|
||||
type(exc_match) in always_warn_on
|
||||
):
|
||||
log.warning(note)
|
||||
|
||||
# await tractor.pause(shield=True)
|
||||
if raise_unmasked:
|
||||
|
||||
if (
|
||||
do_warn
|
||||
and
|
||||
raise_unmasked
|
||||
):
|
||||
if len(masked) < 2:
|
||||
# don't unmask already known "special" cases..
|
||||
if (
|
||||
_mask_cases
|
||||
and
|
||||
(cases := _mask_cases.get(type(exc_ctx)))
|
||||
and
|
||||
(masker_frame := is_expected_masking_case(
|
||||
cases,
|
||||
exc_ctx,
|
||||
exc_match,
|
||||
))
|
||||
):
|
||||
log.warning(
|
||||
f'Ignoring already-known, non-ideal-but-valid '
|
||||
f'masker code @\n'
|
||||
f'{masker_frame}\n'
|
||||
f'\n'
|
||||
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
|
||||
)
|
||||
raise exc_match
|
||||
|
||||
raise exc_ctx from exc_match
|
||||
else:
|
||||
# ?TODO, see above but, possibly unmasking sub-exc
|
||||
# entries if there are > 1
|
||||
await pause(shield=True)
|
||||
|
||||
# ??TODO, see above but, possibly unmasking sub-exc
|
||||
# entries if there are > 1
|
||||
# else:
|
||||
# await pause(shield=True)
|
||||
else:
|
||||
raise
|
||||
|
|
Loading…
Reference in New Issue