Compare commits
27 Commits
b8019f90ec
...
80f822f494
Author | SHA1 | Date |
---|---|---|
|
80f822f494 | |
|
4fbe54991e | |
|
4b4e5df2b7 | |
|
f0adb0fb54 | |
|
769b061a67 | |
|
06b5e19cc4 | |
|
28d6f77e22 | |
|
2dc13a3304 | |
|
83ce2275b9 | |
|
9f757ffa63 | |
|
0c6d512ba4 | |
|
fc130d06b8 | |
|
73423ef2b7 | |
|
b1f2a6b394 | |
|
9489a2f84d | |
|
92eaed6fec | |
|
217d54b9d1 | |
|
34ca02ed11 | |
|
62a364a1d3 | |
|
07781e38cd | |
|
9c6b90ef04 | |
|
542d4c7840 | |
|
9aebe7d8f9 | |
|
04c3d5e239 | |
|
759174729c | |
|
e9f3689191 | |
|
93aa39db07 |
|
@ -21,12 +21,12 @@ async def breakpoint_forever():
|
||||||
async def spawn_until(depth=0):
|
async def spawn_until(depth=0):
|
||||||
""""A nested nursery that triggers another ``NameError``.
|
""""A nested nursery that triggers another ``NameError``.
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
if depth < 1:
|
if depth < 1:
|
||||||
|
|
||||||
await n.run_in_actor(breakpoint_forever)
|
await an.run_in_actor(breakpoint_forever)
|
||||||
|
|
||||||
p = await n.run_in_actor(
|
p = await an.run_in_actor(
|
||||||
name_error,
|
name_error,
|
||||||
name='name_error'
|
name='name_error'
|
||||||
)
|
)
|
||||||
|
@ -38,7 +38,7 @@ async def spawn_until(depth=0):
|
||||||
# recusrive call to spawn another process branching layer of
|
# recusrive call to spawn another process branching layer of
|
||||||
# the tree
|
# the tree
|
||||||
depth -= 1
|
depth -= 1
|
||||||
await n.run_in_actor(
|
await an.run_in_actor(
|
||||||
spawn_until,
|
spawn_until,
|
||||||
depth=depth,
|
depth=depth,
|
||||||
name=f'spawn_until_{depth}',
|
name=f'spawn_until_{depth}',
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
log = tractor.log.get_logger(
|
||||||
|
name=__name__
|
||||||
|
)
|
||||||
|
|
||||||
|
_lock: trio.Lock|None = None
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def acquire_singleton_lock(
|
||||||
|
) -> None:
|
||||||
|
global _lock
|
||||||
|
if _lock is None:
|
||||||
|
log.info('Allocating LOCK')
|
||||||
|
_lock = trio.Lock()
|
||||||
|
|
||||||
|
log.info('TRYING TO LOCK ACQUIRE')
|
||||||
|
async with _lock:
|
||||||
|
log.info('ACQUIRED')
|
||||||
|
yield _lock
|
||||||
|
|
||||||
|
log.info('RELEASED')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def hold_lock_forever(
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
tractor.trionics.maybe_raise_from_masking_exc(),
|
||||||
|
acquire_singleton_lock() as lock,
|
||||||
|
):
|
||||||
|
task_status.started(lock)
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def main(
|
||||||
|
ignore_special_cases: bool,
|
||||||
|
loglevel: str = 'info',
|
||||||
|
debug_mode: bool = True,
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
|
||||||
|
# tractor.trionics.maybe_raise_from_masking_exc()
|
||||||
|
# ^^^ XXX NOTE, interestingly putting the unmasker
|
||||||
|
# here does not exhibit the same behaviour ??
|
||||||
|
):
|
||||||
|
if not ignore_special_cases:
|
||||||
|
from tractor.trionics import _taskc
|
||||||
|
_taskc._mask_cases.clear()
|
||||||
|
|
||||||
|
_lock = await tn.start(
|
||||||
|
hold_lock_forever,
|
||||||
|
)
|
||||||
|
with trio.move_on_after(0.2):
|
||||||
|
await tn.start(
|
||||||
|
hold_lock_forever,
|
||||||
|
)
|
||||||
|
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, manual test as script
|
||||||
|
if __name__ == '__main__':
|
||||||
|
tractor.log.get_console_log(level='info')
|
||||||
|
for case in [True, False]:
|
||||||
|
log.info(
|
||||||
|
f'\n'
|
||||||
|
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||||
|
f'ignore_special_cases: {case!r}\n'
|
||||||
|
)
|
||||||
|
trio.run(partial(
|
||||||
|
main,
|
||||||
|
ignore_special_cases=case,
|
||||||
|
loglevel='info',
|
||||||
|
))
|
|
@ -9,8 +9,10 @@ import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
log = tractor.log.get_logger(__name__)
|
log = tractor.log.get_logger(
|
||||||
tractor.log.get_console_log('info')
|
name=__name__
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def teardown_on_exc(
|
def teardown_on_exc(
|
||||||
|
@ -54,6 +56,7 @@ def teardown_on_exc(
|
||||||
async def finite_stream_to_rent(
|
async def finite_stream_to_rent(
|
||||||
tx: trio.abc.SendChannel,
|
tx: trio.abc.SendChannel,
|
||||||
child_errors_mid_stream: bool,
|
child_errors_mid_stream: bool,
|
||||||
|
raise_unmasked: bool,
|
||||||
|
|
||||||
task_status: trio.TaskStatus[
|
task_status: trio.TaskStatus[
|
||||||
trio.CancelScope,
|
trio.CancelScope,
|
||||||
|
@ -68,20 +71,41 @@ async def finite_stream_to_rent(
|
||||||
# inside the child task!
|
# inside the child task!
|
||||||
#
|
#
|
||||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
tractor.trionics.maybe_raise_from_masking_exc(
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
),
|
||||||
|
|
||||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||||
trio.open_nursery() as _tn,
|
|
||||||
|
# XXX, this ONLY matters in the
|
||||||
|
# `child_errors_mid_stream=False` case oddly!?
|
||||||
|
# THAT IS, if no tn is opened in that case then the
|
||||||
|
# test will not fail; it raises the RTE correctly?
|
||||||
|
#
|
||||||
|
# -> so it seems this new scope somehow affects the form of
|
||||||
|
# eventual in the parent EG?
|
||||||
|
tractor.trionics.maybe_open_nursery(
|
||||||
|
nursery=(
|
||||||
|
None
|
||||||
|
if not child_errors_mid_stream
|
||||||
|
else True
|
||||||
|
),
|
||||||
|
) as _tn,
|
||||||
):
|
):
|
||||||
# pass our scope back to parent for supervision\
|
# pass our scope back to parent for supervision\
|
||||||
# control.
|
# control.
|
||||||
task_status.started(_tn.cancel_scope)
|
cs: trio.CancelScope|None = (
|
||||||
|
None
|
||||||
|
if _tn is True
|
||||||
|
else _tn.cancel_scope
|
||||||
|
)
|
||||||
|
task_status.started(cs)
|
||||||
|
|
||||||
with teardown_on_exc(
|
with teardown_on_exc(
|
||||||
raise_from_handler=not child_errors_mid_stream,
|
raise_from_handler=not child_errors_mid_stream,
|
||||||
):
|
):
|
||||||
for i in range(100):
|
for i in range(100):
|
||||||
log.info(
|
log.debug(
|
||||||
f'Child tx {i!r}\n'
|
f'Child tx {i!r}\n'
|
||||||
)
|
)
|
||||||
if (
|
if (
|
||||||
|
@ -107,23 +131,31 @@ async def main(
|
||||||
# bug and raises.
|
# bug and raises.
|
||||||
#
|
#
|
||||||
child_errors_mid_stream: bool,
|
child_errors_mid_stream: bool,
|
||||||
|
|
||||||
|
raise_unmasked: bool = False,
|
||||||
|
loglevel: str = 'info',
|
||||||
):
|
):
|
||||||
|
tractor.log.get_console_log(level=loglevel)
|
||||||
|
|
||||||
|
# the `.aclose()` being checkpoints on these
|
||||||
|
# is the source of the problem..
|
||||||
tx, rx = trio.open_memory_channel(1)
|
tx, rx = trio.open_memory_channel(1)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
|
tractor.trionics.collapse_eg(),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
rx as rx,
|
rx as rx,
|
||||||
):
|
):
|
||||||
|
|
||||||
_child_cs = await tn.start(
|
_child_cs = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
finite_stream_to_rent,
|
finite_stream_to_rent,
|
||||||
child_errors_mid_stream=child_errors_mid_stream,
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
tx=tx,
|
tx=tx,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
async for msg in rx:
|
async for msg in rx:
|
||||||
log.info(
|
log.debug(
|
||||||
f'Rent rx {msg!r}\n'
|
f'Rent rx {msg!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -139,7 +171,25 @@ async def main(
|
||||||
tn.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, manual test as script
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
tractor.log.get_console_log(level='info')
|
||||||
for case in [True, False]:
|
for case in [True, False]:
|
||||||
trio.run(main, case)
|
log.info(
|
||||||
|
f'\n'
|
||||||
|
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||||
|
f'child_errors_midstream: {case!r}\n'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
trio.run(partial(
|
||||||
|
main,
|
||||||
|
child_errors_mid_stream=case,
|
||||||
|
# raise_unmasked=True,
|
||||||
|
loglevel='info',
|
||||||
|
))
|
||||||
|
except Exception as _exc:
|
||||||
|
exc = _exc
|
||||||
|
log.exception(
|
||||||
|
'Should have raised an RTE or Cancelled?\n'
|
||||||
|
)
|
||||||
|
breakpoint()
|
||||||
|
|
|
@ -709,10 +709,41 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
# timed_out_early: bool = False
|
# timed_out_early: bool = False
|
||||||
|
at_least_one: list[str] = [
|
||||||
|
"bdb.BdbQuit",
|
||||||
|
|
||||||
for send_char in itertools.cycle(['c', 'q']):
|
# leaf subs, which actually raise in "user code"
|
||||||
|
"src_uid=('breakpoint_forever'",
|
||||||
|
"src_uid=('name_error'",
|
||||||
|
|
||||||
|
# 2nd layer subs
|
||||||
|
"src_uid=('spawn_until_1'",
|
||||||
|
"src_uid=('spawn_until_2'",
|
||||||
|
"src_uid=('spawn_until_3'",
|
||||||
|
"relay_uid=('spawn_until_0'",
|
||||||
|
|
||||||
|
# 1st layer subs
|
||||||
|
"src_uid=('spawner0'",
|
||||||
|
"src_uid=('spawner1'",
|
||||||
|
]
|
||||||
|
|
||||||
|
for i, send_char in enumerate(
|
||||||
|
itertools.cycle(['c', 'q'])
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
|
||||||
|
for patt in at_least_one.copy():
|
||||||
|
if in_prompt_msg(
|
||||||
|
child,
|
||||||
|
[patt],
|
||||||
|
):
|
||||||
|
print(
|
||||||
|
f'Found patt in prompt {i}\n'
|
||||||
|
f'patt: {patt!r}\n'
|
||||||
|
)
|
||||||
|
at_least_one.remove(patt)
|
||||||
|
|
||||||
child.sendline(send_char)
|
child.sendline(send_char)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
@ -721,27 +752,15 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
|
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
[ # boxed source errors
|
[
|
||||||
"NameError: name 'doggypants' is not defined",
|
# boxed source errors should show in final
|
||||||
|
# post-prompt tb to console.
|
||||||
"tractor._exceptions.RemoteActorError:",
|
"tractor._exceptions.RemoteActorError:",
|
||||||
"('name_error'",
|
"NameError: name 'doggypants' is not defined",
|
||||||
"bdb.BdbQuit",
|
|
||||||
|
|
||||||
# first level subtrees
|
# TODO? once we get more pedantic with `relay_uid` should
|
||||||
# "tractor._exceptions.RemoteActorError: ('spawner0'",
|
# prolly include all actor-IDs we expect to see in final
|
||||||
"src_uid=('spawner0'",
|
# tb?
|
||||||
|
|
||||||
# "tractor._exceptions.RemoteActorError: ('spawner1'",
|
|
||||||
|
|
||||||
# propagation of errors up through nested subtrees
|
|
||||||
# "tractor._exceptions.RemoteActorError: ('spawn_until_0'",
|
|
||||||
# "tractor._exceptions.RemoteActorError: ('spawn_until_1'",
|
|
||||||
# "tractor._exceptions.RemoteActorError: ('spawn_until_2'",
|
|
||||||
# ^-NOTE-^ old RAE repr, new one is below with a field
|
|
||||||
# showing the src actor's uid.
|
|
||||||
"src_uid=('spawn_until_0'",
|
|
||||||
"relay_uid=('spawn_until_1'",
|
|
||||||
"src_uid=('spawn_until_2'",
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
'''
|
||||||
|
Basic `ActorNursery` operations and closure semantics,
|
||||||
|
- basic remote error collection,
|
||||||
|
- basic multi-subactor cancellation.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# import os
|
||||||
|
# import signal
|
||||||
|
# import platform
|
||||||
|
# import time
|
||||||
|
# from itertools import repeat
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor._exceptions import ActorCancelled
|
||||||
|
# from tractor._testing import (
|
||||||
|
# tractor_test,
|
||||||
|
# )
|
||||||
|
# from .conftest import no_windows
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'num_subs',
|
||||||
|
[
|
||||||
|
1,
|
||||||
|
3,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_one_cancels_all(
|
||||||
|
start_method: str,
|
||||||
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
num_subs: int,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that ifa a single error bubbles to the an-scope the
|
||||||
|
nursery will be cancelled (just like in `trio`); this is a
|
||||||
|
one-cancels-all style strategy and are only supervision policy
|
||||||
|
at the moment.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
try:
|
||||||
|
rte = RuntimeError('Uh oh something bad in parent')
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
start_method=start_method,
|
||||||
|
loglevel=loglevel,
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
|
||||||
|
# spawn the same number of deamon actors which should be cancelled
|
||||||
|
dactor_portals = []
|
||||||
|
for i in range(num_subs):
|
||||||
|
name: str= f'sub_{i}'
|
||||||
|
ptl: tractor.Portal = await an.start_actor(
|
||||||
|
name=name,
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
dactor_portals.append(ptl)
|
||||||
|
|
||||||
|
# wait for booted
|
||||||
|
async with tractor.wait_for_actor(name):
|
||||||
|
print(f'{name!r} is up.')
|
||||||
|
|
||||||
|
# simulate uncaught exc
|
||||||
|
raise rte
|
||||||
|
|
||||||
|
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||||
|
|
||||||
|
except BaseExceptionGroup as _beg:
|
||||||
|
beg = _beg
|
||||||
|
|
||||||
|
# ?TODO? why can't we do `is` on beg?
|
||||||
|
assert (
|
||||||
|
beg.exceptions
|
||||||
|
==
|
||||||
|
an.maybe_error.exceptions
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(beg.exceptions) == (
|
||||||
|
num_subs
|
||||||
|
+
|
||||||
|
1 # rte from root
|
||||||
|
)
|
||||||
|
|
||||||
|
# all subactors should have been implicitly
|
||||||
|
# `Portal.cancel_actor()`ed.
|
||||||
|
excs = list(beg.exceptions)
|
||||||
|
excs.remove(rte)
|
||||||
|
for exc in excs:
|
||||||
|
assert isinstance(exc, ActorCancelled)
|
||||||
|
|
||||||
|
assert an._scope_error is rte
|
||||||
|
assert not an._children
|
||||||
|
assert an.cancelled is True
|
||||||
|
|
||||||
|
trio.run(main)
|
|
@ -11,6 +11,9 @@ from itertools import repeat
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor._exceptions import (
|
||||||
|
ActorCancelled,
|
||||||
|
)
|
||||||
from tractor._testing import (
|
from tractor._testing import (
|
||||||
tractor_test,
|
tractor_test,
|
||||||
)
|
)
|
||||||
|
@ -124,7 +127,10 @@ def test_multierror(
|
||||||
) as nursery:
|
) as nursery:
|
||||||
|
|
||||||
await nursery.run_in_actor(assert_err, name='errorer1')
|
await nursery.run_in_actor(assert_err, name='errorer1')
|
||||||
portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
|
portal2 = await nursery.run_in_actor(
|
||||||
|
assert_err,
|
||||||
|
name='errorer2',
|
||||||
|
)
|
||||||
|
|
||||||
# get result(s) from main task
|
# get result(s) from main task
|
||||||
try:
|
try:
|
||||||
|
@ -137,7 +143,15 @@ def test_multierror(
|
||||||
# here we should get a ``BaseExceptionGroup`` containing exceptions
|
# here we should get a ``BaseExceptionGroup`` containing exceptions
|
||||||
# from both subactors
|
# from both subactors
|
||||||
|
|
||||||
with pytest.raises(BaseExceptionGroup):
|
with pytest.raises(
|
||||||
|
expected_exception=(
|
||||||
|
tractor.RemoteActorError,
|
||||||
|
|
||||||
|
# ?TODO, should it be this??
|
||||||
|
# like `trio`'s strict egs?
|
||||||
|
BaseExceptionGroup,
|
||||||
|
),
|
||||||
|
):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@ -233,8 +247,9 @@ async def stream_forever():
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_cancel_infinite_streamer(start_method):
|
async def test_cancel_infinite_streamer(
|
||||||
|
start_method: str,
|
||||||
|
):
|
||||||
# stream for at most 1 seconds
|
# stream for at most 1 seconds
|
||||||
with (
|
with (
|
||||||
trio.fail_after(4),
|
trio.fail_after(4),
|
||||||
|
@ -291,6 +306,7 @@ async def test_some_cancels_all(
|
||||||
num_actors_and_errs: tuple,
|
num_actors_and_errs: tuple,
|
||||||
start_method: str,
|
start_method: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify a subset of failed subactors causes all others in
|
Verify a subset of failed subactors causes all others in
|
||||||
|
@ -306,68 +322,81 @@ async def test_some_cancels_all(
|
||||||
ria_func,
|
ria_func,
|
||||||
da_func,
|
da_func,
|
||||||
) = num_actors_and_errs
|
) = num_actors_and_errs
|
||||||
try:
|
with trio.fail_after(
|
||||||
async with tractor.open_nursery() as an:
|
3
|
||||||
|
if not debug_mode
|
||||||
|
else 999
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
# spawn the same number of deamon actors which should be cancelled
|
# spawn the same number of deamon actors which should be cancelled
|
||||||
dactor_portals = []
|
dactor_portals = []
|
||||||
for i in range(num_actors):
|
for i in range(num_actors):
|
||||||
dactor_portals.append(await an.start_actor(
|
dactor_portals.append(await an.start_actor(
|
||||||
f'deamon_{i}',
|
f'deamon_{i}',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
))
|
))
|
||||||
|
|
||||||
func, kwargs = ria_func
|
func, kwargs = ria_func
|
||||||
riactor_portals = []
|
riactor_portals = []
|
||||||
for i in range(num_actors):
|
for i in range(num_actors):
|
||||||
# start actor(s) that will fail immediately
|
# start actor(s) that will fail immediately
|
||||||
riactor_portals.append(
|
riactor_portals.append(
|
||||||
await an.run_in_actor(
|
await an.run_in_actor(
|
||||||
func,
|
func,
|
||||||
name=f'actor_{i}',
|
name=f'actor_{i}',
|
||||||
**kwargs
|
**kwargs
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
if da_func:
|
if da_func:
|
||||||
func, kwargs, expect_error = da_func
|
func, kwargs, expect_error = da_func
|
||||||
for portal in dactor_portals:
|
for portal in dactor_portals:
|
||||||
# if this function fails then we should error here
|
# if this function fails then we should error here
|
||||||
# and the nursery should teardown all other actors
|
# and the nursery should teardown all other actors
|
||||||
try:
|
try:
|
||||||
await portal.run(func, **kwargs)
|
await portal.run(func, **kwargs)
|
||||||
|
|
||||||
except tractor.RemoteActorError as err:
|
except tractor.RemoteActorError as err:
|
||||||
assert err.boxed_type == err_type
|
assert err.boxed_type == err_type
|
||||||
# we only expect this first error to propogate
|
# we only expect this first error to propogate
|
||||||
# (all other daemons are cancelled before they
|
# (all other daemons are cancelled before they
|
||||||
# can be scheduled)
|
# can be scheduled)
|
||||||
num_actors = 1
|
num_actors = 1
|
||||||
# reraise so nursery teardown is triggered
|
# reraise so nursery teardown is triggered
|
||||||
raise
|
raise
|
||||||
|
else:
|
||||||
|
if expect_error:
|
||||||
|
pytest.fail(
|
||||||
|
"Deamon call should fail at checkpoint?")
|
||||||
|
|
||||||
|
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||||
|
|
||||||
|
except first_err as _err:
|
||||||
|
err = _err
|
||||||
|
|
||||||
|
if isinstance(err, BaseExceptionGroup):
|
||||||
|
|
||||||
|
assert len(err.exceptions) == num_actors
|
||||||
|
for exc in err.exceptions:
|
||||||
|
|
||||||
|
# TODO, figure out why these aren't being set?
|
||||||
|
if isinstance(exc, ActorCancelled):
|
||||||
|
breakpoint()
|
||||||
|
|
||||||
|
if isinstance(exc, tractor.RemoteActorError):
|
||||||
|
assert exc.boxed_type == err_type
|
||||||
else:
|
else:
|
||||||
if expect_error:
|
assert isinstance(exc, trio.Cancelled)
|
||||||
pytest.fail(
|
|
||||||
"Deamon call should fail at checkpoint?")
|
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` or ``MultiError``
|
elif isinstance(err, tractor.RemoteActorError):
|
||||||
|
assert err.boxed_type == err_type
|
||||||
|
|
||||||
except first_err as _err:
|
assert an.cancelled is True
|
||||||
err = _err
|
assert not an._children
|
||||||
if isinstance(err, BaseExceptionGroup):
|
else:
|
||||||
assert len(err.exceptions) == num_actors
|
pytest.fail("Should have gotten a remote assertion error?")
|
||||||
for exc in err.exceptions:
|
|
||||||
if isinstance(exc, tractor.RemoteActorError):
|
|
||||||
assert exc.boxed_type == err_type
|
|
||||||
else:
|
|
||||||
assert isinstance(exc, trio.Cancelled)
|
|
||||||
elif isinstance(err, tractor.RemoteActorError):
|
|
||||||
assert err.boxed_type == err_type
|
|
||||||
|
|
||||||
assert an.cancelled is True
|
|
||||||
assert not an._children
|
|
||||||
else:
|
|
||||||
pytest.fail("Should have gotten a remote assertion error?")
|
|
||||||
|
|
||||||
|
|
||||||
async def spawn_and_error(breadth, depth) -> None:
|
async def spawn_and_error(breadth, depth) -> None:
|
||||||
|
|
|
@ -95,6 +95,7 @@ def run_example_in_subproc(
|
||||||
and 'integration' not in p[0]
|
and 'integration' not in p[0]
|
||||||
and 'advanced_faults' not in p[0]
|
and 'advanced_faults' not in p[0]
|
||||||
and 'multihost' not in p[0]
|
and 'multihost' not in p[0]
|
||||||
|
and 'trio' not in p[0]
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
ids=lambda t: t[1],
|
ids=lambda t: t[1],
|
||||||
|
|
|
@ -24,14 +24,10 @@ from tractor._testing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX TODO cases:
|
# XXX TODO cases:
|
||||||
# - [ ] peer cancelled itself - so other peers should
|
|
||||||
# get errors reflecting that the peer was itself the .canceller?
|
|
||||||
|
|
||||||
# - [x] WE cancelled the peer and thus should not see any raised
|
# - [x] WE cancelled the peer and thus should not see any raised
|
||||||
# `ContextCancelled` as it should be reaped silently?
|
# `ContextCancelled` as it should be reaped silently?
|
||||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||||
# already covers this case?
|
# already covers this case?
|
||||||
|
|
||||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||||
# Portal.cancel_actor().
|
# Portal.cancel_actor().
|
||||||
# => all other connected peers should get that cancel requesting peer's
|
# => all other connected peers should get that cancel requesting peer's
|
||||||
|
@ -44,16 +40,6 @@ from tractor._testing import (
|
||||||
# that also spawned a remote task task in that same peer-parent.
|
# that also spawned a remote task task in that same peer-parent.
|
||||||
|
|
||||||
|
|
||||||
# def test_self_cancel():
|
|
||||||
# '''
|
|
||||||
# 2 cases:
|
|
||||||
# - calls `Actor.cancel()` locally in some task
|
|
||||||
# - calls LocalPortal.cancel_actor()` ?
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# ...
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def open_stream_then_sleep_forever(
|
async def open_stream_then_sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
@ -806,7 +792,7 @@ async def basic_echo_server(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str = 'wittle_bruv',
|
peer_name: str = 'wittle_bruv',
|
||||||
|
|
||||||
err_after: int|None = None,
|
err_after_imsg: int|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -835,8 +821,9 @@ async def basic_echo_server(
|
||||||
await ipc.send(resp)
|
await ipc.send(resp)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
err_after
|
err_after_imsg
|
||||||
and i > err_after
|
and
|
||||||
|
i > err_after_imsg
|
||||||
):
|
):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'Simulated error in `{peer_name}`'
|
f'Simulated error in `{peer_name}`'
|
||||||
|
@ -978,7 +965,8 @@ async def tell_little_bro(
|
||||||
actor_name: str,
|
actor_name: str,
|
||||||
|
|
||||||
caller: str = '',
|
caller: str = '',
|
||||||
err_after: int|None = None,
|
err_after: float|None = None,
|
||||||
|
rng_seed: int = 50,
|
||||||
):
|
):
|
||||||
# contact target actor, do a stream dialog.
|
# contact target actor, do a stream dialog.
|
||||||
async with (
|
async with (
|
||||||
|
@ -989,14 +977,18 @@ async def tell_little_bro(
|
||||||
basic_echo_server,
|
basic_echo_server,
|
||||||
|
|
||||||
# XXX proxy any delayed err condition
|
# XXX proxy any delayed err condition
|
||||||
err_after=err_after,
|
err_after_imsg=(
|
||||||
|
err_after * rng_seed
|
||||||
|
if err_after is not None
|
||||||
|
else None
|
||||||
|
),
|
||||||
) as (sub_ctx, first),
|
) as (sub_ctx, first),
|
||||||
|
|
||||||
sub_ctx.open_stream() as echo_ipc,
|
sub_ctx.open_stream() as echo_ipc,
|
||||||
):
|
):
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
uid: tuple = actor.uid
|
uid: tuple = actor.uid
|
||||||
for i in range(100):
|
for i in range(rng_seed):
|
||||||
msg: tuple = (
|
msg: tuple = (
|
||||||
uid,
|
uid,
|
||||||
i,
|
i,
|
||||||
|
@ -1021,13 +1013,13 @@ async def tell_little_bro(
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'raise_sub_spawn_error_after',
|
'raise_sub_spawn_error_after',
|
||||||
[None, 50],
|
[None, 0.5],
|
||||||
)
|
)
|
||||||
def test_peer_spawns_and_cancels_service_subactor(
|
def test_peer_spawns_and_cancels_service_subactor(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
raise_sub_spawn_error_after: int|None,
|
raise_sub_spawn_error_after: float|None,
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
@ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# and the server's spawned child should cancel and terminate!
|
# and the server's spawned child should cancel and terminate!
|
||||||
peer_name: str = 'little_bro'
|
peer_name: str = 'little_bro'
|
||||||
|
|
||||||
|
|
||||||
def check_inner_rte(rae: RemoteActorError):
|
def check_inner_rte(rae: RemoteActorError):
|
||||||
'''
|
'''
|
||||||
Validate the little_bro's relayed inception!
|
Validate the little_bro's relayed inception!
|
||||||
|
@ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = await client_ctx.result(hide_tb=False)
|
res = await client_ctx.wait_for_result(hide_tb=False)
|
||||||
|
|
||||||
# in remote (relayed inception) error
|
# in remote (relayed inception) error
|
||||||
# case, we should error on the line above!
|
# case, we should error on the line above!
|
||||||
if raise_sub_spawn_error_after:
|
if raise_sub_spawn_error_after:
|
||||||
|
@ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
assert isinstance(res, ContextCancelled)
|
assert isinstance(res, ContextCancelled)
|
||||||
assert client_ctx.cancel_acked
|
assert client_ctx.cancel_acked
|
||||||
assert res.canceller == root.uid
|
assert res.canceller == root.uid
|
||||||
|
assert not raise_sub_spawn_error_after
|
||||||
|
|
||||||
|
# cancelling the spawner sub should
|
||||||
|
# transitively cancel it's sub, the little
|
||||||
|
# bruv.
|
||||||
|
print('root cancelling server/client sub-actors')
|
||||||
|
await spawn_ctx.cancel()
|
||||||
|
async with tractor.find_actor(
|
||||||
|
name=peer_name,
|
||||||
|
) as sub:
|
||||||
|
assert not sub
|
||||||
|
|
||||||
|
# XXX, only for tracing
|
||||||
|
# except BaseException as _berr:
|
||||||
|
# berr = _berr
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
# raise berr
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
except RemoteActorError as rae:
|
||||||
_err = rae
|
_err = rae
|
||||||
|
@ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
raise
|
raise
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
|
|
||||||
else:
|
|
||||||
assert not raise_sub_spawn_error_after
|
|
||||||
|
|
||||||
# cancelling the spawner sub should
|
|
||||||
# transitively cancel it's sub, the little
|
|
||||||
# bruv.
|
|
||||||
print('root cancelling server/client sub-actors')
|
|
||||||
await spawn_ctx.cancel()
|
|
||||||
async with tractor.find_actor(
|
|
||||||
name=peer_name,
|
|
||||||
) as sub:
|
|
||||||
assert not sub
|
|
||||||
|
|
||||||
|
# await tractor.pause()
|
||||||
# await server.cancel_actor()
|
# await server.cancel_actor()
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
except RemoteActorError as rae:
|
||||||
|
@ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||||
# called directly fot this confext.
|
# called directly for this confext.
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
_ctxc = ctxc
|
_ctxc = ctxc
|
||||||
print(
|
print(
|
||||||
|
@ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# assert spawn_ctx.cancelled_caught
|
# assert spawn_ctx.cancelled_caught
|
||||||
|
|
||||||
|
async def _main():
|
||||||
|
with trio.fail_after(
|
||||||
|
3 if not debug_mode
|
||||||
|
else 999
|
||||||
|
):
|
||||||
|
await main()
|
||||||
|
|
||||||
if raise_sub_spawn_error_after:
|
if raise_sub_spawn_error_after:
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(_main)
|
||||||
|
|
||||||
rae: RemoteActorError = excinfo.value
|
rae: RemoteActorError = excinfo.value
|
||||||
check_inner_rte(rae)
|
check_inner_rte(rae)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio.run(main)
|
trio.run(_main)
|
||||||
|
|
|
@ -0,0 +1,239 @@
|
||||||
|
'''
|
||||||
|
Define the details of inter-actor "out-of-band" (OoB) cancel
|
||||||
|
semantics, that is how cancellation works when a cancel request comes
|
||||||
|
from the different concurrency (primitive's) "layer" then where the
|
||||||
|
eventual `trio.Task` actually raises a signal.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from functools import partial
|
||||||
|
# from contextlib import asynccontextmanager as acm
|
||||||
|
# import itertools
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor import ( # typing
|
||||||
|
ActorNursery,
|
||||||
|
Portal,
|
||||||
|
Context,
|
||||||
|
# ContextCancelled,
|
||||||
|
# RemoteActorError,
|
||||||
|
)
|
||||||
|
# from tractor._testing import (
|
||||||
|
# tractor_test,
|
||||||
|
# expect_ctxc,
|
||||||
|
# )
|
||||||
|
|
||||||
|
# XXX TODO cases:
|
||||||
|
# - [ ] peer cancelled itself - so other peers should
|
||||||
|
# get errors reflecting that the peer was itself the .canceller?
|
||||||
|
|
||||||
|
# def test_self_cancel():
|
||||||
|
# '''
|
||||||
|
# 2 cases:
|
||||||
|
# - calls `Actor.cancel()` locally in some task
|
||||||
|
# - calls LocalPortal.cancel_actor()` ?
|
||||||
|
#
|
||||||
|
# things to ensure!
|
||||||
|
# -[ ] the ctxc raised in a child should ideally show the tb of the
|
||||||
|
# underlying `Cancelled` checkpoint, i.e.
|
||||||
|
# `raise scope_error from ctxc`?
|
||||||
|
#
|
||||||
|
# -[ ] a self-cancelled context, if not allowed to block on
|
||||||
|
# `ctx.result()` at some point will hang since the `ctx._scope`
|
||||||
|
# is never `.cancel_called`; cases for this include,
|
||||||
|
# - an `open_ctx()` which never starteds before being OoB actor
|
||||||
|
# cancelled.
|
||||||
|
# |_ parent task will be blocked in `.open_context()` for the
|
||||||
|
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
|
||||||
|
# will never have been signalled..
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# ...
|
||||||
|
|
||||||
|
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
|
||||||
|
# but with the `Lock.acquire()` from a `@context` to ensure the
|
||||||
|
# implicit ignore-case-non-unmasking.
|
||||||
|
#
|
||||||
|
# @tractor.context
|
||||||
|
# async def acquire_actor_global_lock(
|
||||||
|
# ctx: tractor.Context,
|
||||||
|
# ignore_special_cases: bool,
|
||||||
|
# ):
|
||||||
|
|
||||||
|
# async with maybe_unmask_excs(
|
||||||
|
# ignore_special_cases=ignore_special_cases,
|
||||||
|
# ):
|
||||||
|
# await ctx.started('locked')
|
||||||
|
|
||||||
|
# # block til cancelled
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep_forever(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
# ignore_special_cases: bool,
|
||||||
|
do_started: bool,
|
||||||
|
):
|
||||||
|
|
||||||
|
# async with maybe_unmask_excs(
|
||||||
|
# ignore_special_cases=ignore_special_cases,
|
||||||
|
# ):
|
||||||
|
# await ctx.started('locked')
|
||||||
|
if do_started:
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
# block til cancelled
|
||||||
|
print('sleepin on child-side..')
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'cancel_ctx',
|
||||||
|
[True, False],
|
||||||
|
)
|
||||||
|
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
cancel_ctx: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
The most "basic" out-of-band-task self-cancellation case where
|
||||||
|
`Portal.open_context()` is entered in a bg task and the
|
||||||
|
parent-task (of the containing nursery) calls `Context.cancel()`
|
||||||
|
without the child knowing; the `Context._scope` should be
|
||||||
|
`.cancel_called` when the IPC ctx's child-side relays
|
||||||
|
a `ContextCancelled` with a `.canceller` set to the parent
|
||||||
|
actor('s task).
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(
|
||||||
|
2 if not debug_mode else 999,
|
||||||
|
):
|
||||||
|
an: ActorNursery
|
||||||
|
async with (
|
||||||
|
tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel='devx',
|
||||||
|
enable_stack_on_sig=True,
|
||||||
|
) as an,
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
ptl: Portal = await an.start_actor(
|
||||||
|
'sub',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _open_ctx_async(
|
||||||
|
do_started: bool = True,
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
# do we expect to never enter the
|
||||||
|
# `.open_context()` below.
|
||||||
|
if not do_started:
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
|
async with ptl.open_context(
|
||||||
|
sleep_forever,
|
||||||
|
do_started=do_started,
|
||||||
|
) as (ctx, first):
|
||||||
|
task_status.started(ctx)
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# XXX, this is the key OoB part!
|
||||||
|
#
|
||||||
|
# - start the `.open_context()` in a bg task which
|
||||||
|
# blocks inside the embedded scope-body,
|
||||||
|
#
|
||||||
|
# - when we call `Context.cancel()` it **is
|
||||||
|
# not** from the same task which eventually runs
|
||||||
|
# `.__aexit__()`,
|
||||||
|
#
|
||||||
|
# - since the bg "opener" task will be in
|
||||||
|
# a `trio.sleep_forever()`, it must be interrupted
|
||||||
|
# by the `ContextCancelled` delivered from the
|
||||||
|
# child-side; `Context._scope: CancelScope` MUST
|
||||||
|
# be `.cancel_called`!
|
||||||
|
#
|
||||||
|
print('ASYNC opening IPC context in subtask..')
|
||||||
|
maybe_ctx: Context|None = await tn.start(partial(
|
||||||
|
_open_ctx_async,
|
||||||
|
))
|
||||||
|
|
||||||
|
if (
|
||||||
|
maybe_ctx
|
||||||
|
and
|
||||||
|
cancel_ctx
|
||||||
|
):
|
||||||
|
print('cancelling first IPC ctx!')
|
||||||
|
await maybe_ctx.cancel()
|
||||||
|
|
||||||
|
# XXX, note that despite `maybe_context.cancel()`
|
||||||
|
# being called above, it's the parent (bg) task
|
||||||
|
# which was originally never interrupted in
|
||||||
|
# the `ctx._scope` body due to missing case logic in
|
||||||
|
# `ctx._maybe_cancel_and_set_remote_error()`.
|
||||||
|
#
|
||||||
|
# It didn't matter that the subactor process was
|
||||||
|
# already terminated and reaped, nothing was
|
||||||
|
# cancelling the ctx-parent task's scope!
|
||||||
|
#
|
||||||
|
print('cancelling subactor!')
|
||||||
|
await ptl.cancel_actor()
|
||||||
|
|
||||||
|
if maybe_ctx:
|
||||||
|
try:
|
||||||
|
await maybe_ctx.wait_for_result()
|
||||||
|
except tractor.ContextCancelled as ctxc:
|
||||||
|
assert not cancel_ctx
|
||||||
|
assert (
|
||||||
|
ctxc.canceller
|
||||||
|
==
|
||||||
|
tractor.current_actor().aid.uid
|
||||||
|
)
|
||||||
|
# don't re-raise since it'll trigger
|
||||||
|
# an EG from the above tn.
|
||||||
|
|
||||||
|
if cancel_ctx:
|
||||||
|
# graceful self-cancel
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# ctx parent task should see OoB ctxc due to
|
||||||
|
# `ptl.cancel_actor()`.
|
||||||
|
with pytest.raises(tractor.ContextCancelled) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert 'root' in excinfo.value.canceller[0]
|
||||||
|
|
||||||
|
|
||||||
|
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
|
||||||
|
# debug_mode: bool,
|
||||||
|
# loglevel: str,
|
||||||
|
# ):
|
||||||
|
# '''
|
||||||
|
# Demos OoB cancellation from the perspective of a ctx opened with
|
||||||
|
# a child subactor where the parent cancels the child at the "actor
|
||||||
|
# layer" using `Portal.cancel_actor()` and thus the
|
||||||
|
# `ContextCancelled.canceller` received by the ctx's parent-side
|
||||||
|
# task will appear to be a "self cancellation" even though that
|
||||||
|
# specific task itself was not cancelled and thus
|
||||||
|
# `Context.cancel_called ==False`.
|
||||||
|
# '''
|
||||||
|
# TODO, do we have an existing implied ctx
|
||||||
|
# cancel test like this?
|
||||||
|
# with trio.move_on_after(0.5):# as cs:
|
||||||
|
# await _open_ctx_async(
|
||||||
|
# do_started=False,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
|
# in-line ctx scope should definitely raise
|
||||||
|
# a ctxc with `.canceller = 'root'`
|
||||||
|
# async with ptl.open_context(
|
||||||
|
# sleep_forever,
|
||||||
|
# do_started=True,
|
||||||
|
# ) as pair:
|
||||||
|
|
|
@ -6,11 +6,18 @@ want to see changed.
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from _pytest import pathlib
|
||||||
from tractor.trionics import collapse_eg
|
from tractor.trionics import collapse_eg
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio import TaskStatus
|
||||||
|
from tractor._testing import (
|
||||||
|
examples_dir,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
|
Demo how a masking `trio.Cancelled` could be handled by unmasking
|
||||||
`.__context__` field when a user (by accident) re-raises from a `finally:`.
|
from the `.__context__` field when a user (by accident) re-raises
|
||||||
|
from a `finally:`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -158,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
assert len(assert_eg.exceptions) == 1
|
assert len(assert_eg.exceptions) == 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
Demo how a using an `async with sndchan` inside
|
||||||
will break a strict-eg-tn's multi-cancelled absorption..
|
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
|
||||||
|
multi-cancelled absorption..
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
@ -190,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
f'Closed {task!r}\n'
|
f'Closed {task!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with (
|
async with (
|
||||||
# XXX should ensure ONLY the KBI
|
# XXX should ensure ONLY the KBI
|
||||||
|
@ -211,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
|
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'raise_unmasked', [
|
||||||
|
True,
|
||||||
|
pytest.param(
|
||||||
|
False,
|
||||||
|
marks=pytest.mark.xfail(
|
||||||
|
reason="see examples/trio/send_chan_aclose_masks.py"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'child_errors_mid_stream',
|
||||||
|
[True, False],
|
||||||
|
)
|
||||||
|
def test_unmask_aclose_as_checkpoint_on_aexit(
|
||||||
|
raise_unmasked: bool,
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that our unmasker util works over the common case where
|
||||||
|
a mem-chan's `.aclose()` is included in an `@acm` stack
|
||||||
|
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
|
||||||
|
exception from user code resulting in a silent failure which
|
||||||
|
appears like graceful cancellation.
|
||||||
|
|
||||||
|
This test suite is mostly implemented as an example script so it
|
||||||
|
could more easily be shared with `trio`-core peeps as `tractor`-less
|
||||||
|
minimum reproducing example.
|
||||||
|
|
||||||
|
'''
|
||||||
|
mod: ModuleType = pathlib.import_path(
|
||||||
|
examples_dir()
|
||||||
|
/ 'trio'
|
||||||
|
/ 'send_chan_aclose_masks_beg.py',
|
||||||
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
|
)
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
trio.run(partial(
|
||||||
|
mod.main,
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'ignore_special_cases', [
|
||||||
|
True,
|
||||||
|
pytest.param(
|
||||||
|
False,
|
||||||
|
marks=pytest.mark.xfail(
|
||||||
|
reason="see examples/trio/lockacquire_not_umasked.py"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
|
||||||
|
ignore_special_cases: bool,
|
||||||
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
mod: ModuleType = pathlib.import_path(
|
||||||
|
examples_dir()
|
||||||
|
/ 'trio'
|
||||||
|
/ 'lockacquire_not_unmasked.py',
|
||||||
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
|
)
|
||||||
|
async def _main():
|
||||||
|
with trio.fail_after(2):
|
||||||
|
await mod.main(
|
||||||
|
ignore_special_cases=ignore_special_cases,
|
||||||
|
loglevel=loglevel,
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
)
|
||||||
|
|
||||||
|
trio.run(_main)
|
||||||
|
|
|
@ -442,25 +442,25 @@ class Context:
|
||||||
'''
|
'''
|
||||||
Records whether cancellation has been requested for this context
|
Records whether cancellation has been requested for this context
|
||||||
by a call to `.cancel()` either due to,
|
by a call to `.cancel()` either due to,
|
||||||
- either an explicit call by some local task,
|
- an explicit call by some local task,
|
||||||
- or an implicit call due to an error caught inside
|
- or an implicit call due to an error caught inside
|
||||||
the ``Portal.open_context()`` block.
|
the `Portal.open_context()` block.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._cancel_called
|
return self._cancel_called
|
||||||
|
|
||||||
@cancel_called.setter
|
# XXX, to debug who frickin sets it..
|
||||||
def cancel_called(self, val: bool) -> None:
|
# @cancel_called.setter
|
||||||
'''
|
# def cancel_called(self, val: bool) -> None:
|
||||||
Set the self-cancelled request `bool` value.
|
# '''
|
||||||
|
# Set the self-cancelled request `bool` value.
|
||||||
|
|
||||||
'''
|
# '''
|
||||||
# to debug who frickin sets it..
|
# if val:
|
||||||
# if val:
|
# from .devx import pause_from_sync
|
||||||
# from .devx import pause_from_sync
|
# pause_from_sync()
|
||||||
# pause_from_sync()
|
|
||||||
|
|
||||||
self._cancel_called = val
|
# self._cancel_called = val
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
|
@ -635,6 +635,71 @@ class Context:
|
||||||
'''
|
'''
|
||||||
await self.chan.send(Stop(cid=self.cid))
|
await self.chan.send(Stop(cid=self.cid))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def parent_task(self) -> trio.Task:
|
||||||
|
'''
|
||||||
|
This IPC context's "owning task" which is a `trio.Task`
|
||||||
|
on one of the "sides" of the IPC.
|
||||||
|
|
||||||
|
Note that the "parent_" prefix here refers to the local
|
||||||
|
`trio` task tree using the same interface as
|
||||||
|
`trio.Nursery.parent_task` whereas for IPC contexts,
|
||||||
|
a different cross-actor task hierarchy exists:
|
||||||
|
|
||||||
|
- a "parent"-side which originally entered
|
||||||
|
`Portal.open_context()`,
|
||||||
|
|
||||||
|
- the "child"-side which was spawned and scheduled to invoke
|
||||||
|
a function decorated with `@tractor.context`.
|
||||||
|
|
||||||
|
This task is thus a handle to mem-domain-distinct/per-process
|
||||||
|
`Nursery.parent_task` depending on in which of the above
|
||||||
|
"sides" this context exists.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._task
|
||||||
|
|
||||||
|
def _is_blocked_on_rx_chan(self) -> bool:
|
||||||
|
'''
|
||||||
|
Predicate to indicate whether the owner `._task: trio.Task` is
|
||||||
|
currently blocked (by `.receive()`-ing) on its underlying RPC
|
||||||
|
feeder `._rx_chan`.
|
||||||
|
|
||||||
|
This knowledge is highly useful when handling so called
|
||||||
|
"out-of-band" (OoB) cancellation conditions where a peer
|
||||||
|
actor's task transmitted some remote error/cancel-msg and we
|
||||||
|
must know whether to signal-via-cancel currently executing
|
||||||
|
"user-code" (user defined code embedded in `ctx._scope`) or
|
||||||
|
simply to forward the IPC-msg-as-error **without calling**
|
||||||
|
`._scope.cancel()`.
|
||||||
|
|
||||||
|
In the latter case it is presumed that if the owner task is
|
||||||
|
blocking for the next IPC msg, it will eventually receive,
|
||||||
|
process and raise the equivalent local error **without**
|
||||||
|
requiring `._scope.cancel()` to be explicitly called by the
|
||||||
|
*delivering OoB RPC-task* (via `_deliver_msg()`).
|
||||||
|
|
||||||
|
'''
|
||||||
|
# NOTE, see the mem-chan meth-impls for *why* this
|
||||||
|
# logic works,
|
||||||
|
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
|
||||||
|
#
|
||||||
|
# XXX realize that this is NOT an
|
||||||
|
# official/will-be-loudly-deprecated API:
|
||||||
|
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
|
||||||
|
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
|
||||||
|
#
|
||||||
|
# orig repo intro in the mem-chan change over patch:
|
||||||
|
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
|
||||||
|
# |_https://github.com/python-trio/trio/pull/616
|
||||||
|
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
|
||||||
|
#
|
||||||
|
return (
|
||||||
|
self._task.custom_sleep_data
|
||||||
|
is
|
||||||
|
self._rx_chan
|
||||||
|
)
|
||||||
|
|
||||||
def _maybe_cancel_and_set_remote_error(
|
def _maybe_cancel_and_set_remote_error(
|
||||||
self,
|
self,
|
||||||
error: BaseException,
|
error: BaseException,
|
||||||
|
@ -787,13 +852,27 @@ class Context:
|
||||||
if self._canceller is None:
|
if self._canceller is None:
|
||||||
log.error('Ctx has no canceller set!?')
|
log.error('Ctx has no canceller set!?')
|
||||||
|
|
||||||
|
cs: trio.CancelScope = self._scope
|
||||||
|
|
||||||
|
# ?TODO? see comment @ .start_remote_task()`
|
||||||
|
#
|
||||||
|
# if not cs:
|
||||||
|
# from .devx import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
# raise RuntimeError(
|
||||||
|
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
|
||||||
|
# f'{self}\n'
|
||||||
|
# f'\n'
|
||||||
|
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
|
||||||
|
# )
|
||||||
|
|
||||||
# Cancel the local `._scope`, catch that
|
# Cancel the local `._scope`, catch that
|
||||||
# `._scope.cancelled_caught` and re-raise any remote error
|
# `._scope.cancelled_caught` and re-raise any remote error
|
||||||
# once exiting (or manually calling `.wait_for_result()`) the
|
# once exiting (or manually calling `.wait_for_result()`) the
|
||||||
# `.open_context()` block.
|
# `.open_context()` block.
|
||||||
cs: trio.CancelScope = self._scope
|
|
||||||
if (
|
if (
|
||||||
cs
|
cs
|
||||||
|
and not cs.cancel_called
|
||||||
|
|
||||||
# XXX this is an expected cancel request response
|
# XXX this is an expected cancel request response
|
||||||
# message and we **don't need to raise it** in the
|
# message and we **don't need to raise it** in the
|
||||||
|
@ -802,8 +881,7 @@ class Context:
|
||||||
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
||||||
# always should be set.
|
# always should be set.
|
||||||
and not self._is_self_cancelled()
|
and not self._is_self_cancelled()
|
||||||
and not cs.cancel_called
|
# and not cs.cancelled_caught
|
||||||
and not cs.cancelled_caught
|
|
||||||
):
|
):
|
||||||
if (
|
if (
|
||||||
msgerr
|
msgerr
|
||||||
|
@ -814,7 +892,7 @@ class Context:
|
||||||
not self._cancel_on_msgerr
|
not self._cancel_on_msgerr
|
||||||
):
|
):
|
||||||
message: str = (
|
message: str = (
|
||||||
'NOT Cancelling `Context._scope` since,\n'
|
f'NOT Cancelling `Context._scope` since,\n'
|
||||||
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||||
f'AND we got a msg-type-error!\n'
|
f'AND we got a msg-type-error!\n'
|
||||||
f'{error}\n'
|
f'{error}\n'
|
||||||
|
@ -824,13 +902,43 @@ class Context:
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import pause_from_sync
|
cs.cancel()
|
||||||
# pause_from_sync()
|
|
||||||
self._scope.cancel()
|
# TODO, explicit condition for OoB (self-)cancellation?
|
||||||
else:
|
# - we called `Portal.cancel_actor()` from this actor
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
# and the peer ctx task delivered ctxc due to it.
|
||||||
|
# - currently `self._is_self_cancelled()` will be true
|
||||||
|
# since the ctxc.canceller check will match us even though it
|
||||||
|
# wasn't from this ctx specifically!
|
||||||
|
elif (
|
||||||
|
cs
|
||||||
|
and self._is_self_cancelled()
|
||||||
|
and not cs.cancel_called
|
||||||
|
):
|
||||||
|
message: str = (
|
||||||
|
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
|
||||||
|
'\n'
|
||||||
|
)
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
# mk_pdb().set_trace()
|
# mk_pdb().set_trace()
|
||||||
|
# TODO XXX, required to fix timeout failure in
|
||||||
|
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
|
||||||
|
#
|
||||||
|
|
||||||
|
# XXX NOTE XXX, this is SUPER SUBTLE!
|
||||||
|
# we only want to cancel our embedded `._scope`
|
||||||
|
# if the ctx's current/using task is NOT blocked
|
||||||
|
# on `._rx_chan.receive()` and on some other
|
||||||
|
# `trio`-checkpoint since in the former case
|
||||||
|
# any `._remote_error` will be relayed through
|
||||||
|
# the rx-chan and appropriately raised by the owning
|
||||||
|
# `._task` directly. IF the owner task is however
|
||||||
|
# blocking elsewhere we need to interrupt it **now**.
|
||||||
|
if not self._is_blocked_on_rx_chan():
|
||||||
|
cs.cancel()
|
||||||
|
else:
|
||||||
|
# rx_stats = self._rx_chan.statistics()
|
||||||
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
|
|
||||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
||||||
if (
|
if (
|
||||||
|
@ -854,6 +962,7 @@ class Context:
|
||||||
+
|
+
|
||||||
cs_fmt
|
cs_fmt
|
||||||
)
|
)
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
message
|
message
|
||||||
+
|
+
|
||||||
|
@ -946,8 +1055,9 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side: str = self.side
|
side: str = self.side
|
||||||
# XXX for debug via the `@.setter`
|
self._cancel_called = True
|
||||||
self.cancel_called = True
|
# ^ XXX for debug via the `@.setter`
|
||||||
|
# self.cancel_called = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx from {side!r}-side\n'
|
f'Cancelling ctx from {side!r}-side\n'
|
||||||
|
@ -2011,6 +2121,9 @@ async def open_context_from_portal(
|
||||||
f'|_{portal.actor}\n'
|
f'|_{portal.actor}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ?TODO? could we move this to inside the `tn` block?
|
||||||
|
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
|
||||||
|
# -> would allow a `if not ._scope: => raise RTE` ?
|
||||||
ctx: Context = await portal.actor.start_remote_task(
|
ctx: Context = await portal.actor.start_remote_task(
|
||||||
portal.channel,
|
portal.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
|
@ -2037,6 +2150,7 @@ async def open_context_from_portal(
|
||||||
scope_err: BaseException|None = None
|
scope_err: BaseException|None = None
|
||||||
ctxc_from_child: ContextCancelled|None = None
|
ctxc_from_child: ContextCancelled|None = None
|
||||||
try:
|
try:
|
||||||
|
# from .devx import pause
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(),
|
collapse_eg(),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
|
@ -2059,6 +2173,10 @@ async def open_context_from_portal(
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
# handling block below.
|
# handling block below.
|
||||||
try:
|
try:
|
||||||
|
log.runtime(
|
||||||
|
f'IPC ctx parent waiting on Started msg..\n'
|
||||||
|
f'ctx.cid: {ctx.cid!r}\n'
|
||||||
|
)
|
||||||
started_msg, first = await ctx._pld_rx.recv_msg(
|
started_msg, first = await ctx._pld_rx.recv_msg(
|
||||||
ipc=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
|
@ -2067,16 +2185,16 @@ async def open_context_from_portal(
|
||||||
)
|
)
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
ctx_cs: trio.CancelScope = ctx._scope
|
ctx_cs: trio.CancelScope = ctx._scope
|
||||||
|
log.cancel(
|
||||||
|
f'IPC ctx was cancelled during "child" task sync due to\n\n'
|
||||||
|
f'.cid: {ctx.cid!r}\n'
|
||||||
|
f'.maybe_error: {ctx.maybe_error!r}\n'
|
||||||
|
)
|
||||||
|
# await pause(shield=True)
|
||||||
|
|
||||||
if not ctx_cs.cancel_called:
|
if not ctx_cs.cancel_called:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# from .devx import pause
|
|
||||||
# await pause(shield=True)
|
|
||||||
|
|
||||||
log.cancel(
|
|
||||||
'IPC ctx was cancelled during "child" task sync due to\n\n'
|
|
||||||
f'{ctx.maybe_error}\n'
|
|
||||||
)
|
|
||||||
# OW if the ctx's scope was cancelled manually,
|
# OW if the ctx's scope was cancelled manually,
|
||||||
# likely the `Context` was cancelled via a call to
|
# likely the `Context` was cancelled via a call to
|
||||||
# `._maybe_cancel_and_set_remote_error()` so ensure
|
# `._maybe_cancel_and_set_remote_error()` so ensure
|
||||||
|
@ -2199,7 +2317,7 @@ async def open_context_from_portal(
|
||||||
# documenting it as a definittive example of
|
# documenting it as a definittive example of
|
||||||
# debugging the tractor-runtime itself using it's
|
# debugging the tractor-runtime itself using it's
|
||||||
# own `.devx.` tooling!
|
# own `.devx.` tooling!
|
||||||
#
|
#
|
||||||
# await debug.pause()
|
# await debug.pause()
|
||||||
|
|
||||||
# CASE 2: context was cancelled by local task calling
|
# CASE 2: context was cancelled by local task calling
|
||||||
|
@ -2272,13 +2390,16 @@ async def open_context_from_portal(
|
||||||
match scope_err:
|
match scope_err:
|
||||||
case trio.Cancelled():
|
case trio.Cancelled():
|
||||||
logmeth = log.cancel
|
logmeth = log.cancel
|
||||||
|
cause: str = 'cancelled'
|
||||||
|
|
||||||
# XXX explicitly report on any non-graceful-taskc cases
|
# XXX explicitly report on any non-graceful-taskc cases
|
||||||
case _:
|
case _:
|
||||||
|
cause: str = 'errored'
|
||||||
logmeth = log.exception
|
logmeth = log.exception
|
||||||
|
|
||||||
logmeth(
|
logmeth(
|
||||||
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
|
f'ctx {ctx.side!r}-side {cause!r} with,\n'
|
||||||
|
f'{ctx.repr_outcome()!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if debug_mode():
|
if debug_mode():
|
||||||
|
@ -2303,6 +2424,7 @@ async def open_context_from_portal(
|
||||||
# told us it's cancelled ;p
|
# told us it's cancelled ;p
|
||||||
if ctxc_from_child is None:
|
if ctxc_from_child is None:
|
||||||
try:
|
try:
|
||||||
|
# await pause(shield=True)
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
@ -2459,8 +2581,10 @@ async def open_context_from_portal(
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context cancelled by local {ctx.side!r}-side task\n'
|
f'Context cancelled by local {ctx.side!r}-side task\n'
|
||||||
f'c)>\n'
|
f'c)>\n'
|
||||||
f' |_{ctx._task}\n\n'
|
f' |_{ctx.parent_task}\n'
|
||||||
f'{repr(scope_err)}\n'
|
f' .cid={ctx.cid!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{scope_err!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: should we add a `._cancel_req_received`
|
# TODO: should we add a `._cancel_req_received`
|
||||||
|
|
|
@ -27,7 +27,7 @@ from typing import (
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
from tractor.log import get_logger
|
from .log import get_logger
|
||||||
from .trionics import (
|
from .trionics import (
|
||||||
gather_contexts,
|
gather_contexts,
|
||||||
collapse_eg,
|
collapse_eg,
|
||||||
|
@ -217,7 +217,7 @@ async def find_actor(
|
||||||
raise_on_none: bool = False,
|
raise_on_none: bool = False,
|
||||||
|
|
||||||
) -> AsyncGenerator[
|
) -> AsyncGenerator[
|
||||||
Portal | list[Portal] | None,
|
Portal|list[Portal]|None,
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
|
@ -259,6 +259,7 @@ async def find_actor(
|
||||||
collapse_eg(),
|
collapse_eg(),
|
||||||
gather_contexts(
|
gather_contexts(
|
||||||
mngrs=maybe_portals,
|
mngrs=maybe_portals,
|
||||||
|
# tn=tn, # ?TODO, helps to pass rent tn here?
|
||||||
) as portals,
|
) as portals,
|
||||||
):
|
):
|
||||||
# log.runtime(
|
# log.runtime(
|
||||||
|
|
|
@ -46,6 +46,7 @@ from msgspec import (
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
|
Aid,
|
||||||
Error,
|
Error,
|
||||||
PayloadMsg,
|
PayloadMsg,
|
||||||
MsgType,
|
MsgType,
|
||||||
|
@ -479,9 +480,10 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def relay_uid(self) -> tuple[str, str]|None:
|
def relay_uid(self) -> tuple[str, str]|None:
|
||||||
return tuple(
|
if msg := self._ipc_msg:
|
||||||
self._ipc_msg.relay_path[-1]
|
return tuple(
|
||||||
)
|
msg.relay_path[-1]
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def src_uid(self) -> tuple[str, str]|None:
|
def src_uid(self) -> tuple[str, str]|None:
|
||||||
|
@ -521,7 +523,8 @@ class RemoteActorError(Exception):
|
||||||
for key in fields:
|
for key in fields:
|
||||||
if (
|
if (
|
||||||
key == 'relay_uid'
|
key == 'relay_uid'
|
||||||
and not self.is_inception()
|
and
|
||||||
|
not self.is_inception()
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -534,6 +537,13 @@ class RemoteActorError(Exception):
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if (
|
||||||
|
key == 'canceller'
|
||||||
|
and
|
||||||
|
isinstance(val, Aid)
|
||||||
|
):
|
||||||
|
val: str = val.reprol(sin_uuid=False)
|
||||||
|
|
||||||
# TODO: for `.relay_path` on multiline?
|
# TODO: for `.relay_path` on multiline?
|
||||||
# if not isinstance(val, str):
|
# if not isinstance(val, str):
|
||||||
# val_str = pformat(val)
|
# val_str = pformat(val)
|
||||||
|
@ -623,12 +633,22 @@ class RemoteActorError(Exception):
|
||||||
# IFF there is an embedded traceback-str we always
|
# IFF there is an embedded traceback-str we always
|
||||||
# draw the ascii-box around it.
|
# draw the ascii-box around it.
|
||||||
body: str = ''
|
body: str = ''
|
||||||
if tb_str := self.tb_str:
|
fields: str = self._mk_fields_str(
|
||||||
fields: str = self._mk_fields_str(
|
_body_fields
|
||||||
_body_fields
|
+
|
||||||
+
|
self.extra_body_fields,
|
||||||
self.extra_body_fields,
|
)
|
||||||
)
|
|
||||||
|
tb_str: str = (
|
||||||
|
self.tb_str
|
||||||
|
#
|
||||||
|
# ^TODO? what to use instead? if anything?
|
||||||
|
# -[ ] ensure the `.message` doesn't show up 2x in output ya?
|
||||||
|
# -[ ] ._message isn't really right?
|
||||||
|
# or
|
||||||
|
# self._message
|
||||||
|
)
|
||||||
|
if tb_str:
|
||||||
from tractor.devx import (
|
from tractor.devx import (
|
||||||
pformat_boxed_tb,
|
pformat_boxed_tb,
|
||||||
)
|
)
|
||||||
|
@ -640,7 +660,7 @@ class RemoteActorError(Exception):
|
||||||
# just after <Type(
|
# just after <Type(
|
||||||
# |___ ..
|
# |___ ..
|
||||||
tb_body_indent=1,
|
tb_body_indent=1,
|
||||||
boxer_header=self.relay_uid,
|
boxer_header=self.relay_uid or '-',
|
||||||
)
|
)
|
||||||
|
|
||||||
# !TODO, it'd be nice to import these top level without
|
# !TODO, it'd be nice to import these top level without
|
||||||
|
@ -713,6 +733,10 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
class ContextCancelled(RemoteActorError):
|
class ContextCancelled(RemoteActorError):
|
||||||
'''
|
'''
|
||||||
|
IPC context cancellation signal/msg.
|
||||||
|
|
||||||
|
Often reffed with the short-hand: "ctxc".
|
||||||
|
|
||||||
Inter-actor task context was cancelled by either a call to
|
Inter-actor task context was cancelled by either a call to
|
||||||
``Portal.cancel_actor()`` or ``Context.cancel()``.
|
``Portal.cancel_actor()`` or ``Context.cancel()``.
|
||||||
|
|
||||||
|
@ -737,8 +761,8 @@ class ContextCancelled(RemoteActorError):
|
||||||
|
|
||||||
- (simulating) an IPC transport network outage
|
- (simulating) an IPC transport network outage
|
||||||
- a (malicious) pkt sent specifically to cancel an actor's
|
- a (malicious) pkt sent specifically to cancel an actor's
|
||||||
runtime non-gracefully without ensuring ongoing RPC tasks are
|
runtime non-gracefully without ensuring ongoing RPC tasks
|
||||||
incrementally cancelled as is done with:
|
are incrementally cancelled as is done with:
|
||||||
`Actor`
|
`Actor`
|
||||||
|_`.cancel()`
|
|_`.cancel()`
|
||||||
|_`.cancel_soon()`
|
|_`.cancel_soon()`
|
||||||
|
@ -759,6 +783,59 @@ class ContextCancelled(RemoteActorError):
|
||||||
# src_actor_uid = canceller
|
# src_actor_uid = canceller
|
||||||
|
|
||||||
|
|
||||||
|
class ActorCancelled(ContextCancelled):
|
||||||
|
'''
|
||||||
|
Runtime-layer cancellation signal/msg.
|
||||||
|
|
||||||
|
Indicates a "graceful interrupt" of the machinery scheduled by
|
||||||
|
the py-proc's `trio.run()`.
|
||||||
|
|
||||||
|
Often reffed with the short-hand: "actorc".
|
||||||
|
|
||||||
|
Raised from within `an: ActorNursery` (via an `ExceptionGroup`)
|
||||||
|
when an actor has been "process wide" cancel-called using any of,
|
||||||
|
|
||||||
|
- `ActorNursery.cancel()`
|
||||||
|
- `Portal.cancel_actor()`
|
||||||
|
|
||||||
|
**and** that cancel request was part of a "non graceful" cancel
|
||||||
|
condition.
|
||||||
|
|
||||||
|
That is, whenever an exception is to be raised outside an `an`
|
||||||
|
scope-block due to some error raised-in/relayed-to that scope. In
|
||||||
|
such cases for every subactor which was cancelledand subsequently
|
||||||
|
( and according to the `an`'s supervision strat ) this is
|
||||||
|
normally raised per subactor portal.
|
||||||
|
|
||||||
|
'''
|
||||||
|
@property
|
||||||
|
def canceller(self) -> Aid:
|
||||||
|
'''
|
||||||
|
Return the (maybe) `Actor.aid: Aid` for the requesting-author
|
||||||
|
of this actorc.
|
||||||
|
|
||||||
|
Emit a warning msg when `.canceller` has not been set.
|
||||||
|
|
||||||
|
See additional relevant notes in
|
||||||
|
`ContextCancelled.canceller`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
value: tuple[str, str]|None
|
||||||
|
if msg := self._ipc_msg:
|
||||||
|
value = msg.canceller
|
||||||
|
else:
|
||||||
|
value = self._extra_msgdata['canceller']
|
||||||
|
|
||||||
|
if value:
|
||||||
|
return value
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
'IPC Context cancelled without a requesting actor?\n'
|
||||||
|
'Maybe the IPC transport ended abruptly?\n\n'
|
||||||
|
f'{self}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MsgTypeError(
|
class MsgTypeError(
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
):
|
):
|
||||||
|
|
|
@ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress
|
||||||
from tractor._portal import Portal
|
from tractor._portal import Portal
|
||||||
from tractor._runtime import Actor
|
from tractor._runtime import Actor
|
||||||
from tractor._entry import _mp_main
|
from tractor._entry import _mp_main
|
||||||
from tractor._exceptions import ActorFailure
|
from tractor._exceptions import (
|
||||||
|
ActorCancelled,
|
||||||
|
ActorFailure,
|
||||||
|
# NoResult,
|
||||||
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
|
@ -137,7 +141,6 @@ def try_set_start_method(
|
||||||
|
|
||||||
|
|
||||||
async def exhaust_portal(
|
async def exhaust_portal(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor
|
actor: Actor
|
||||||
|
|
||||||
|
@ -185,10 +188,12 @@ async def exhaust_portal(
|
||||||
|
|
||||||
|
|
||||||
async def cancel_on_completion(
|
async def cancel_on_completion(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -209,24 +214,57 @@ async def cancel_on_completion(
|
||||||
portal,
|
portal,
|
||||||
actor,
|
actor,
|
||||||
)
|
)
|
||||||
|
aid: msgtypes.Aid = actor.aid
|
||||||
|
repr_aid: str = aid.reprol(sin_uuid=False)
|
||||||
|
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors[actor.uid]: Exception = result
|
errors[aid]: Exception = result
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancelling subactor runtime due to error:\n\n'
|
'Cancelling subactor {repr_aid!r} runtime due to error\n'
|
||||||
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
f'\n'
|
||||||
f'error: {result}\n'
|
f'Portal.cancel_actor() => {portal.channel.uid}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{result!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.runtime(
|
report: str = (
|
||||||
'Cancelling subactor gracefully:\n\n'
|
f'Cancelling subactor {repr_aid!r} gracefully..\n'
|
||||||
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
f'\n'
|
||||||
f'result: {result}\n'
|
)
|
||||||
|
canc_info: str = (
|
||||||
|
f'Portal.cancel_actor() => {portal.chan.uid}\n'
|
||||||
|
f'\n'
|
||||||
|
f'final-result => {result!r}\n'
|
||||||
|
)
|
||||||
|
log.cancel(
|
||||||
|
report
|
||||||
|
+
|
||||||
|
canc_info
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel the process now that we have a final result
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
if (
|
||||||
|
not errors.get(aid)
|
||||||
|
# and
|
||||||
|
# result is NoResult
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
# await debug.pause(shield=True)
|
||||||
|
|
||||||
|
# errors[aid] = ActorCancelled(
|
||||||
|
# message=(
|
||||||
|
# f'Cancelled subactor {repr_aid!r}\n'
|
||||||
|
# f'{canc_info}\n'
|
||||||
|
# ),
|
||||||
|
# canceller=current_actor().aid,
|
||||||
|
# # TODO? should we have a ack-msg?
|
||||||
|
# # ipc_msg=??
|
||||||
|
# # boxed_type=trio.Cancelled,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
async def hard_kill(
|
async def hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
|
@ -331,6 +369,10 @@ async def soft_kill(
|
||||||
Awaitable,
|
Awaitable,
|
||||||
],
|
],
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -374,8 +416,8 @@ async def soft_kill(
|
||||||
# below. This means we try to do a graceful teardown
|
# below. This means we try to do a graceful teardown
|
||||||
# via sending a cancel message before getting out
|
# via sending a cancel message before getting out
|
||||||
# zombie killing tools.
|
# zombie killing tools.
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as tn:
|
||||||
n.cancel_scope.shield = True
|
tn.cancel_scope.shield = True
|
||||||
|
|
||||||
async def cancel_on_proc_deth():
|
async def cancel_on_proc_deth():
|
||||||
'''
|
'''
|
||||||
|
@ -385,24 +427,35 @@ async def soft_kill(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await wait_func(proc)
|
await wait_func(proc)
|
||||||
n.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
# start a task to wait on the termination of the
|
# start a task to wait on the termination of the
|
||||||
# process by itself waiting on a (caller provided) wait
|
# process by itself waiting on a (caller provided) wait
|
||||||
# function which should unblock when the target process
|
# function which should unblock when the target process
|
||||||
# has terminated.
|
# has terminated.
|
||||||
n.start_soon(cancel_on_proc_deth)
|
tn.start_soon(cancel_on_proc_deth)
|
||||||
|
|
||||||
# send the actor-runtime a cancel request.
|
# send the actor-runtime a cancel request.
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
# if not errors.get(peer_aid):
|
||||||
|
# errors[peer_aid] = ActorCancelled(
|
||||||
|
# message=(
|
||||||
|
# 'Sub-actor cancelled gracefully by parent\n'
|
||||||
|
# ),
|
||||||
|
# canceller=current_actor().aid,
|
||||||
|
# # TODO? should we have a ack-msg?
|
||||||
|
# # ipc_msg=??
|
||||||
|
# # boxed_type=trio.Cancelled,
|
||||||
|
# )
|
||||||
|
|
||||||
if proc.poll() is None: # type: ignore
|
if proc.poll() is None: # type: ignore
|
||||||
log.warning(
|
log.warning(
|
||||||
'Subactor still alive after cancel request?\n\n'
|
'Subactor still alive after cancel request?\n\n'
|
||||||
f'uid: {peer_aid}\n'
|
f'uid: {peer_aid}\n'
|
||||||
f'|_{proc}\n'
|
f'|_{proc}\n'
|
||||||
)
|
)
|
||||||
n.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@ -410,7 +463,10 @@ async def new_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
@ -449,7 +505,10 @@ async def trio_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
@ -572,9 +631,9 @@ async def trio_proc(
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor_nursery._join_procs.wait()
|
await actor_nursery._join_procs.wait()
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as ptl_reaper_tn:
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
nursery.start_soon(
|
ptl_reaper_tn.start_soon(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
|
@ -587,7 +646,8 @@ async def trio_proc(
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
||||||
portal
|
portal,
|
||||||
|
errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
|
@ -596,7 +656,7 @@ async def trio_proc(
|
||||||
'Cancelling portal result reaper task\n'
|
'Cancelling portal result reaper task\n'
|
||||||
f'c)> {subactor.aid.reprol()!r}\n'
|
f'c)> {subactor.aid.reprol()!r}\n'
|
||||||
)
|
)
|
||||||
nursery.cancel_scope.cancel()
|
ptl_reaper_tn.cancel_scope.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
||||||
|
@ -669,7 +729,10 @@ async def mp_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery, # type: ignore # noqa
|
actor_nursery: ActorNursery, # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
parent_addr: UnwrappedAddress,
|
parent_addr: UnwrappedAddress,
|
||||||
|
@ -794,7 +857,7 @@ async def mp_proc(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
errors
|
errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
# This is a "soft" (cancellable) join/reap which
|
# This is a "soft" (cancellable) join/reap which
|
||||||
|
@ -803,7 +866,8 @@ async def mp_proc(
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
proc_waiter,
|
proc_waiter,
|
||||||
portal
|
portal,
|
||||||
|
errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
|
|
|
@ -30,6 +30,9 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
from .msg import (
|
||||||
|
types as msgtypes,
|
||||||
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
pformat as _pformat,
|
pformat as _pformat,
|
||||||
|
@ -48,6 +51,7 @@ from .trionics import (
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
ActorCancelled,
|
||||||
)
|
)
|
||||||
from ._root import (
|
from ._root import (
|
||||||
open_root_actor,
|
open_root_actor,
|
||||||
|
@ -99,7 +103,10 @@ class ActorNursery:
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
errors: dict[tuple[str, str], BaseException],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
BaseException,
|
||||||
|
],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
|
@ -117,9 +124,11 @@ class ActorNursery:
|
||||||
]
|
]
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
|
# signals when it is ok to start waiting o subactor procs
|
||||||
|
# for termination.
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self._errors = errors
|
||||||
self._scope_error: BaseException|None = None
|
self._scope_error: BaseException|None = None
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
|
|
||||||
|
@ -260,7 +269,7 @@ class ActorNursery:
|
||||||
name,
|
name,
|
||||||
self,
|
self,
|
||||||
subactor,
|
subactor,
|
||||||
self.errors,
|
self._errors,
|
||||||
bind_addrs,
|
bind_addrs,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
_rtv, # run time vars
|
_rtv, # run time vars
|
||||||
|
@ -364,7 +373,9 @@ class ActorNursery:
|
||||||
# then `._children`..
|
# then `._children`..
|
||||||
children: dict = self._children
|
children: dict = self._children
|
||||||
child_count: int = len(children)
|
child_count: int = len(children)
|
||||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
msg: str = (
|
||||||
|
f'Cancelling actor-nursery with {child_count} children\n'
|
||||||
|
)
|
||||||
|
|
||||||
server: IPCServer = self._actor.ipc_server
|
server: IPCServer = self._actor.ipc_server
|
||||||
|
|
||||||
|
@ -391,7 +402,9 @@ class ActorNursery:
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event: trio.Event = server._peer_connected[subactor.uid]
|
event: trio.Event = server._peer_connected[
|
||||||
|
subactor.uid
|
||||||
|
]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} never 't finished spawning?"
|
f"{subactor.uid} never 't finished spawning?"
|
||||||
)
|
)
|
||||||
|
@ -416,7 +429,20 @@ class ActorNursery:
|
||||||
# spawn cancel tasks for each sub-actor
|
# spawn cancel tasks for each sub-actor
|
||||||
assert portal
|
assert portal
|
||||||
if portal.channel.connected():
|
if portal.channel.connected():
|
||||||
tn.start_soon(portal.cancel_actor)
|
|
||||||
|
async def canc_subactor():
|
||||||
|
await portal.cancel_actor()
|
||||||
|
# aid: msgtypes.Aid = subactor.aid
|
||||||
|
# reprol: str = aid.reprol(sin_uuid=False)
|
||||||
|
# if not self._errors.get(aid):
|
||||||
|
# self._errors[aid] = ActorCancelled(
|
||||||
|
# message=(
|
||||||
|
# f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n'
|
||||||
|
# ),
|
||||||
|
# canceller=self._actor.aid,
|
||||||
|
# )
|
||||||
|
|
||||||
|
tn.start_soon(canc_subactor)
|
||||||
|
|
||||||
log.cancel(msg)
|
log.cancel(msg)
|
||||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
@ -442,22 +468,66 @@ class ActorNursery:
|
||||||
# mark ourselves as having (tried to have) cancelled all subactors
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
self._join_procs.set()
|
self._join_procs.set()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def maybe_error(self) -> (
|
||||||
|
BaseException|
|
||||||
|
BaseExceptionGroup|
|
||||||
|
None
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Deliver any captured scope errors including those relayed
|
||||||
|
from subactors such as `ActorCancelled` during a non-graceful
|
||||||
|
cancellation scenario.
|
||||||
|
|
||||||
|
When more then a "graceful cancel" occurrs wrap all collected
|
||||||
|
sub-exceptions in a raised `ExceptionGroup`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
scope_exc: BaseException|None = self._scope_error
|
||||||
|
|
||||||
|
# XXX NOTE, only pack an eg if there i at least one
|
||||||
|
# non-actorc exception received from a subactor, OR
|
||||||
|
# return `._scope_error` verbatim.
|
||||||
|
if (errors := self._errors):
|
||||||
|
# use `BaseExceptionGroup` as needed
|
||||||
|
excs: list[BaseException] = list(errors.values())
|
||||||
|
if (
|
||||||
|
len(excs) > 1
|
||||||
|
and
|
||||||
|
any(
|
||||||
|
type(exc) not in {ActorCancelled,}
|
||||||
|
for exc in excs
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return ExceptionGroup(
|
||||||
|
'ActorNursery multi-errored with',
|
||||||
|
tuple(excs),
|
||||||
|
)
|
||||||
|
|
||||||
|
# raise the lone subactor exc
|
||||||
|
return list(excs)[0]
|
||||||
|
|
||||||
|
return scope_exc
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def _open_and_supervise_one_cancels_all_nursery(
|
async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
tb_hide: bool = False,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
# normally don't need to show user by default
|
# normally don't need to show user by default
|
||||||
__tracebackhide__: bool = tb_hide
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
outer_err: BaseException|None = None
|
outer_err: BaseException|None = None
|
||||||
inner_err: BaseException|None = None
|
inner_err: BaseException|None = None
|
||||||
|
|
||||||
# the collection of errors retreived from spawned sub-actors
|
# the collection of errors retreived from spawned sub-actors
|
||||||
errors: dict[tuple[str, str], BaseException] = {}
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
BaseException,
|
||||||
|
] = {}
|
||||||
|
|
||||||
# This is the outermost level "deamon actor" nursery. It is awaited
|
# This is the outermost level "deamon actor" nursery. It is awaited
|
||||||
# **after** the below inner "run in actor nursery". This allows for
|
# **after** the below inner "run in actor nursery". This allows for
|
||||||
|
@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# `ActorNursery.start_actor()`).
|
# `ActorNursery.start_actor()`).
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
async with (
|
try:
|
||||||
collapse_eg(),
|
async with (
|
||||||
trio.open_nursery() as da_nursery,
|
collapse_eg(),
|
||||||
):
|
trio.open_nursery() as da_nursery,
|
||||||
try:
|
):
|
||||||
# This is the inner level "run in actor" nursery. It is
|
try:
|
||||||
# awaited first since actors spawned in this way (using
|
# This is the inner level "run in actor" nursery. It is
|
||||||
# `ActorNusery.run_in_actor()`) are expected to only
|
# awaited first since actors spawned in this way (using
|
||||||
# return a single result and then complete (i.e. be canclled
|
# `ActorNusery.run_in_actor()`) are expected to only
|
||||||
# gracefully). Errors collected from these actors are
|
# return a single result and then complete (i.e. be canclled
|
||||||
# immediately raised for handling by a supervisor strategy.
|
# gracefully). Errors collected from these actors are
|
||||||
# As such if the strategy propagates any error(s) upwards
|
# immediately raised for handling by a supervisor strategy.
|
||||||
# the above "daemon actor" nursery will be notified.
|
# As such if the strategy propagates any error(s) upwards
|
||||||
async with (
|
# the above "daemon actor" nursery will be notified.
|
||||||
collapse_eg(),
|
async with (
|
||||||
trio.open_nursery() as ria_nursery,
|
collapse_eg(),
|
||||||
):
|
trio.open_nursery() as ria_nursery,
|
||||||
an = ActorNursery(
|
):
|
||||||
actor,
|
an = ActorNursery(
|
||||||
ria_nursery,
|
actor,
|
||||||
da_nursery,
|
ria_nursery,
|
||||||
errors
|
da_nursery,
|
||||||
)
|
errors
|
||||||
try:
|
|
||||||
# spawning of actors happens in the caller's scope
|
|
||||||
# after we yield upwards
|
|
||||||
yield an
|
|
||||||
|
|
||||||
# When we didn't error in the caller's scope,
|
|
||||||
# signal all process-monitor-tasks to conduct
|
|
||||||
# the "hard join phase".
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on subactors to complete:\n'
|
|
||||||
f'>}} {len(an._children)}\n'
|
|
||||||
)
|
)
|
||||||
an._join_procs.set()
|
try:
|
||||||
|
# spawning of actors happens in the caller's scope
|
||||||
|
# after we yield upwards
|
||||||
|
yield an
|
||||||
|
|
||||||
except BaseException as _inner_err:
|
# When we didn't error in the caller's scope,
|
||||||
inner_err = _inner_err
|
# signal all process-monitor-tasks to conduct
|
||||||
errors[actor.uid] = inner_err
|
# the "hard join phase".
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on subactors to complete:\n'
|
||||||
|
f'>}} {len(an._children)}\n'
|
||||||
|
)
|
||||||
|
an._join_procs.set()
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
except BaseException as _inner_err:
|
||||||
# engaged we don't want to prematurely kill (and
|
inner_err = _inner_err
|
||||||
# thus clobber access to) the local tty since it
|
# errors[actor.aid] = inner_err
|
||||||
# will make the pdb repl unusable.
|
|
||||||
# Instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
await debug.maybe_wait_for_debugger(
|
|
||||||
child_in_debug=an._at_least_one_child_in_debug
|
|
||||||
)
|
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# If we error in the root but the debugger is
|
||||||
# one-cancels-all supervisor strategy (don't
|
# engaged we don't want to prematurely kill (and
|
||||||
# worry more are coming).
|
# thus clobber access to) the local tty since it
|
||||||
an._join_procs.set()
|
# will make the pdb repl unusable.
|
||||||
|
# Instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
await debug.maybe_wait_for_debugger(
|
||||||
|
child_in_debug=an._at_least_one_child_in_debug
|
||||||
|
)
|
||||||
|
|
||||||
# XXX NOTE XXX: hypothetically an error could
|
# if the caller's scope errored then we activate our
|
||||||
# be raised and then a cancel signal shows up
|
# one-cancels-all supervisor strategy (don't
|
||||||
# slightly after in which case the `else:`
|
# worry more are coming).
|
||||||
# block here might not complete? For now,
|
an._join_procs.set()
|
||||||
# shield both.
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
etype: type = type(inner_err)
|
|
||||||
if etype in (
|
|
||||||
trio.Cancelled,
|
|
||||||
KeyboardInterrupt,
|
|
||||||
) or (
|
|
||||||
is_multi_cancelled(inner_err)
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
f'Actor-nursery cancelled by {etype}\n\n'
|
|
||||||
|
|
||||||
f'{current_actor().uid}\n'
|
# XXX NOTE XXX: hypothetically an error could
|
||||||
f' |_{an}\n\n'
|
# be raised and then a cancel signal shows up
|
||||||
|
# slightly after in which case the `else:`
|
||||||
|
# block here might not complete? For now,
|
||||||
|
# shield both.
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
etype: type = type(inner_err)
|
||||||
|
if etype in (
|
||||||
|
trio.Cancelled,
|
||||||
|
KeyboardInterrupt,
|
||||||
|
) or (
|
||||||
|
is_multi_cancelled(inner_err)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
f'Actor-nursery cancelled by {etype}\n\n'
|
||||||
|
|
||||||
# TODO: show tb str?
|
f'{current_actor().uid}\n'
|
||||||
# f'{tb_str}'
|
f' |_{an}\n\n'
|
||||||
)
|
|
||||||
elif etype in {
|
|
||||||
ContextCancelled,
|
|
||||||
}:
|
|
||||||
log.cancel(
|
|
||||||
'Actor-nursery caught remote cancellation\n'
|
|
||||||
'\n'
|
|
||||||
f'{inner_err.tb_str}'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
log.exception(
|
|
||||||
'Nursery errored with:\n'
|
|
||||||
|
|
||||||
# TODO: same thing as in
|
# TODO: show tb str?
|
||||||
# `._invoke()` to compute how to
|
# f'{tb_str}'
|
||||||
# place this div-line in the
|
)
|
||||||
# middle of the above msg
|
elif etype in {
|
||||||
# content..
|
ContextCancelled,
|
||||||
# -[ ] prolly helper-func it too
|
}:
|
||||||
# in our `.log` module..
|
log.cancel(
|
||||||
# '------ - ------'
|
'Actor-nursery caught remote cancellation\n'
|
||||||
)
|
'\n'
|
||||||
|
f'{inner_err.tb_str}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.exception(
|
||||||
|
'Nursery errored with:\n'
|
||||||
|
|
||||||
# cancel all subactors
|
# TODO: same thing as in
|
||||||
await an.cancel()
|
# `._invoke()` to compute how to
|
||||||
|
# place this div-line in the
|
||||||
|
# middle of the above msg
|
||||||
|
# content..
|
||||||
|
# -[ ] prolly helper-func it too
|
||||||
|
# in our `.log` module..
|
||||||
|
# '------ - ------'
|
||||||
|
)
|
||||||
|
|
||||||
# ria_nursery scope end
|
# cancel all subactors
|
||||||
|
await an.cancel()
|
||||||
|
|
||||||
# TODO: this is the handler around the ``.run_in_actor()``
|
# ria_nursery scope end
|
||||||
# nursery. Ideally we can drop this entirely in the future as
|
|
||||||
# the whole ``.run_in_actor()`` API should be built "on top of"
|
|
||||||
# this lower level spawn-request-cancel "daemon actor" API where
|
|
||||||
# a local in-actor task nursery is used with one-to-one task
|
|
||||||
# + `await Portal.run()` calls and the results/errors are
|
|
||||||
# handled directly (inline) and errors by the local nursery.
|
|
||||||
except (
|
|
||||||
Exception,
|
|
||||||
BaseExceptionGroup,
|
|
||||||
trio.Cancelled
|
|
||||||
) as _outer_err:
|
|
||||||
outer_err = _outer_err
|
|
||||||
|
|
||||||
an._scope_error = outer_err or inner_err
|
# TODO: this is the handler around the ``.run_in_actor()``
|
||||||
|
# nursery. Ideally we can drop this entirely in the future as
|
||||||
|
# the whole ``.run_in_actor()`` API should be built "on top of"
|
||||||
|
# this lower level spawn-request-cancel "daemon actor" API where
|
||||||
|
# a local in-actor task nursery is used with one-to-one task
|
||||||
|
# + `await Portal.run()` calls and the results/errors are
|
||||||
|
# handled directly (inline) and errors by the local nursery.
|
||||||
|
except (
|
||||||
|
Exception,
|
||||||
|
BaseExceptionGroup,
|
||||||
|
trio.Cancelled
|
||||||
|
) as _outer_err:
|
||||||
|
outer_err = _outer_err
|
||||||
|
|
||||||
# XXX: yet another guard before allowing the cancel
|
# XXX: yet another guard before allowing the cancel
|
||||||
# sequence in case a (single) child is in debug.
|
# sequence in case a (single) child is in debug.
|
||||||
await debug.maybe_wait_for_debugger(
|
await debug.maybe_wait_for_debugger(
|
||||||
child_in_debug=an._at_least_one_child_in_debug
|
child_in_debug=an._at_least_one_child_in_debug
|
||||||
)
|
|
||||||
|
|
||||||
# If actor-local error was raised while waiting on
|
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
|
||||||
# remaining sub-actors (due to our lone strategy:
|
|
||||||
# one-cancels-all).
|
|
||||||
if an._children:
|
|
||||||
log.cancel(
|
|
||||||
'Actor-nursery cancelling due error type:\n'
|
|
||||||
f'{outer_err}\n'
|
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await an.cancel()
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
# If actor-local error was raised while waiting on
|
||||||
# No errors were raised while awaiting ".run_in_actor()"
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# actors but those actors may have returned remote errors as
|
# remaining sub-actors (due to our lone strategy:
|
||||||
# results (meaning they errored remotely and have relayed
|
# one-cancels-all).
|
||||||
# those errors back to this parent actor). The errors are
|
|
||||||
# collected in ``errors`` so cancel all actors, summarize
|
|
||||||
# all errors and re-raise.
|
|
||||||
if errors:
|
|
||||||
if an._children:
|
if an._children:
|
||||||
|
log.cancel(
|
||||||
|
'Actor-nursery cancelling due error type:\n'
|
||||||
|
f'{outer_err}\n'
|
||||||
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
|
||||||
# use `BaseExceptionGroup` as needed
|
raise
|
||||||
if len(errors) > 1:
|
|
||||||
raise BaseExceptionGroup(
|
|
||||||
'tractor.ActorNursery errored with',
|
|
||||||
tuple(errors.values()),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise list(errors.values())[0]
|
|
||||||
|
|
||||||
# show frame on any (likely) internal error
|
finally:
|
||||||
if (
|
scope_exc = an._scope_error = outer_err or inner_err
|
||||||
not an.cancelled
|
# await debug.pause(shield=True)
|
||||||
and an._scope_error
|
# if scope_exc:
|
||||||
):
|
# errors[actor.aid] = scope_exc
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
# da_nursery scope end - nursery checkpoint
|
# show this frame on any internal error
|
||||||
# final exit
|
if (
|
||||||
|
not an.cancelled
|
||||||
|
and
|
||||||
|
scope_exc
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
|
# NOTE, it's possible no errors were raised while
|
||||||
|
# awaiting ".run_in_actor()" actors but those
|
||||||
|
# sub-actors may have delivered remote errors as
|
||||||
|
# results, normally captured via machinery in
|
||||||
|
# `._spawn.cancel_on_completion()`.
|
||||||
|
#
|
||||||
|
# Any such remote errors are collected in `an._errors`
|
||||||
|
# which is summarized via `ActorNursery.maybe_error`
|
||||||
|
# which is maybe re-raised in an outer block (below).
|
||||||
|
#
|
||||||
|
# So here we first cancel all subactors the summarize
|
||||||
|
# all errors and then later (in that outer block)
|
||||||
|
# maybe-raise on a "non-graceful" cancellation
|
||||||
|
# outcome, normally as a summary EG.
|
||||||
|
if (
|
||||||
|
scope_exc
|
||||||
|
or
|
||||||
|
errors
|
||||||
|
):
|
||||||
|
|
||||||
|
if an._children:
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await an.cancel()
|
||||||
|
|
||||||
|
# cancel outer tn so we unblock outside this
|
||||||
|
# finally!
|
||||||
|
da_nursery.cance_scope.cancel()
|
||||||
|
#
|
||||||
|
# ^TODO? still don't get why needed?
|
||||||
|
# - an.cancel() should cause all spawn-subtasks
|
||||||
|
# to eventually exit?
|
||||||
|
# - also, could (instead) we sync to an event here before
|
||||||
|
# (ever) calling `an.cancel()`??
|
||||||
|
|
||||||
|
# `da_nursery` scope end, thus a checkpoint.
|
||||||
|
finally:
|
||||||
|
|
||||||
|
# raise any eg compiled from all subs
|
||||||
|
# ??TODO should we also adopt strict-egs here like
|
||||||
|
# `trio.Nursery`??
|
||||||
|
#
|
||||||
|
# XXX justification notes,
|
||||||
|
# docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups
|
||||||
|
# anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888
|
||||||
|
# gh: https://github.com/python-trio/trio/issues/611
|
||||||
|
if an_exc := an.maybe_error:
|
||||||
|
raise an_exc
|
||||||
|
|
||||||
|
if scope_exc := an._scope_error:
|
||||||
|
raise scope_exc
|
||||||
|
|
||||||
|
# @acm-fn scope exit
|
||||||
|
|
||||||
|
|
||||||
_shutdown_msg: str = (
|
_shutdown_msg: str = (
|
||||||
|
@ -648,7 +754,7 @@ _shutdown_msg: str = (
|
||||||
# @api_frame
|
# @api_frame
|
||||||
async def open_nursery(
|
async def open_nursery(
|
||||||
*, # named params only!
|
*, # named params only!
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
# ^TODO, paramspec for `open_root_actor()`
|
# ^TODO, paramspec for `open_root_actor()`
|
||||||
|
|
||||||
|
@ -684,16 +790,21 @@ async def open_nursery(
|
||||||
# mark us for teardown on exit
|
# mark us for teardown on exit
|
||||||
implicit_runtime: bool = True
|
implicit_runtime: bool = True
|
||||||
|
|
||||||
async with open_root_actor(
|
async with (
|
||||||
hide_tb=hide_tb,
|
# collapse_eg(hide_tb=hide_tb),
|
||||||
**kwargs,
|
open_root_actor(
|
||||||
) as actor:
|
hide_tb=hide_tb,
|
||||||
|
**kwargs,
|
||||||
|
) as actor,
|
||||||
|
):
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with (
|
||||||
actor
|
_open_and_supervise_one_cancels_all_nursery(
|
||||||
) as an:
|
actor
|
||||||
|
) as an
|
||||||
|
):
|
||||||
|
|
||||||
# NOTE: mark this nursery as having
|
# NOTE: mark this nursery as having
|
||||||
# implicitly started the root actor so
|
# implicitly started the root actor so
|
||||||
|
|
|
@ -613,10 +613,9 @@ async def drain_to_final_msg(
|
||||||
# msg: dict = await ctx._rx_chan.receive()
|
# msg: dict = await ctx._rx_chan.receive()
|
||||||
# if res_cs.cancelled_caught:
|
# if res_cs.cancelled_caught:
|
||||||
#
|
#
|
||||||
# -[ ] make sure pause points work here for REPLing
|
# -[x] make sure pause points work here for REPLing
|
||||||
# the runtime itself; i.e. ensure there's no hangs!
|
# the runtime itself; i.e. ensure there's no hangs!
|
||||||
# |_from tractor.devx.debug import pause
|
# |_see masked code below in .cancel_called path
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# NOTE: we get here if the far end was
|
# NOTE: we get here if the far end was
|
||||||
# `ContextCancelled` in 2 cases:
|
# `ContextCancelled` in 2 cases:
|
||||||
|
@ -652,6 +651,10 @@ async def drain_to_final_msg(
|
||||||
f'IPC ctx cancelled externally during result drain ?\n'
|
f'IPC ctx cancelled externally during result drain ?\n'
|
||||||
f'{ctx}'
|
f'{ctx}'
|
||||||
)
|
)
|
||||||
|
# XXX, for tracing `Cancelled`..
|
||||||
|
# from tractor.devx.debug import pause
|
||||||
|
# await pause(shield=True)
|
||||||
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
|
|
|
@ -222,16 +222,18 @@ class _Cache:
|
||||||
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async with mng as value:
|
try:
|
||||||
_, no_more_users = cls.resources[ctx_key]
|
async with mng as value:
|
||||||
cls.values[ctx_key] = value
|
_, no_more_users = cls.resources[ctx_key]
|
||||||
task_status.started(value)
|
try:
|
||||||
try:
|
cls.values[ctx_key] = value
|
||||||
await no_more_users.wait()
|
task_status.started(value)
|
||||||
finally:
|
await no_more_users.wait()
|
||||||
# discard nursery ref so it won't be re-used (an error)?
|
finally:
|
||||||
value = cls.values.pop(ctx_key)
|
value = cls.values.pop(ctx_key)
|
||||||
cls.resources.pop(ctx_key)
|
finally:
|
||||||
|
# discard nursery ref so it won't be re-used (an error)?
|
||||||
|
cls.resources.pop(ctx_key)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
|
@ -22,6 +22,10 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
import inspect
|
||||||
|
from types import (
|
||||||
|
TracebackType,
|
||||||
|
)
|
||||||
from typing import (
|
from typing import (
|
||||||
Type,
|
Type,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -63,6 +67,66 @@ def find_masked_excs(
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
_mask_cases: dict[
|
||||||
|
Type[Exception], # masked exc type
|
||||||
|
dict[
|
||||||
|
int, # inner-frame index into `inspect.getinnerframes()`
|
||||||
|
# `FrameInfo.function/filename: str`s to match
|
||||||
|
dict[str, str],
|
||||||
|
],
|
||||||
|
] = {
|
||||||
|
trio.WouldBlock: {
|
||||||
|
# `trio.Lock.acquire()` has a checkpoint inside the
|
||||||
|
# `WouldBlock`-no_wait path's handler..
|
||||||
|
-5: { # "5th frame up" from checkpoint
|
||||||
|
'filename': 'trio/_sync.py',
|
||||||
|
'function': 'acquire',
|
||||||
|
# 'lineno': 605, # matters?
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def is_expected_masking_case(
|
||||||
|
cases: dict,
|
||||||
|
exc_ctx: Exception,
|
||||||
|
exc_match: BaseException,
|
||||||
|
|
||||||
|
) -> bool|inspect.FrameInfo:
|
||||||
|
'''
|
||||||
|
Determine whether the provided masked exception is from a known
|
||||||
|
bug/special/unintentional-`trio`-impl case which we do not wish
|
||||||
|
to unmask.
|
||||||
|
|
||||||
|
Return any guilty `inspect.FrameInfo` ow `False`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
exc_tb: TracebackType = exc_match.__traceback__
|
||||||
|
if cases := _mask_cases.get(type(exc_ctx)):
|
||||||
|
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
|
||||||
|
|
||||||
|
# from tractor.devx.debug import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
for iframe, matchon in cases.items():
|
||||||
|
try:
|
||||||
|
masker_frame: inspect.FrameInfo = inner[iframe]
|
||||||
|
except IndexError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for field, in_field in matchon.items():
|
||||||
|
val = getattr(
|
||||||
|
masker_frame,
|
||||||
|
field,
|
||||||
|
)
|
||||||
|
if in_field not in val:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
return masker_frame
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# XXX, relevant discussion @ `trio`-core,
|
# XXX, relevant discussion @ `trio`-core,
|
||||||
# https://github.com/python-trio/trio/issues/455
|
# https://github.com/python-trio/trio/issues/455
|
||||||
#
|
#
|
||||||
|
@ -99,6 +163,7 @@ async def maybe_raise_from_masking_exc(
|
||||||
# ^XXX, special case(s) where we warn-log bc likely
|
# ^XXX, special case(s) where we warn-log bc likely
|
||||||
# there will be no operational diff since the exc
|
# there will be no operational diff since the exc
|
||||||
# is always expected to be consumed.
|
# is always expected to be consumed.
|
||||||
|
|
||||||
) -> BoxedMaybeException:
|
) -> BoxedMaybeException:
|
||||||
'''
|
'''
|
||||||
Maybe un-mask and re-raise exception(s) suppressed by a known
|
Maybe un-mask and re-raise exception(s) suppressed by a known
|
||||||
|
@ -197,6 +262,27 @@ async def maybe_raise_from_masking_exc(
|
||||||
raise_unmasked
|
raise_unmasked
|
||||||
):
|
):
|
||||||
if len(masked) < 2:
|
if len(masked) < 2:
|
||||||
|
# don't unmask already known "special" cases..
|
||||||
|
if (
|
||||||
|
_mask_cases
|
||||||
|
and
|
||||||
|
(cases := _mask_cases.get(type(exc_ctx)))
|
||||||
|
and
|
||||||
|
(masker_frame := is_expected_masking_case(
|
||||||
|
cases,
|
||||||
|
exc_ctx,
|
||||||
|
exc_match,
|
||||||
|
))
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
f'Ignoring already-known, non-ideal-but-valid '
|
||||||
|
f'masker code @\n'
|
||||||
|
f'{masker_frame}\n'
|
||||||
|
f'\n'
|
||||||
|
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
|
||||||
|
)
|
||||||
|
raise exc_match
|
||||||
|
|
||||||
raise exc_ctx from exc_match
|
raise exc_ctx from exc_match
|
||||||
|
|
||||||
# ??TODO, see above but, possibly unmasking sub-exc
|
# ??TODO, see above but, possibly unmasking sub-exc
|
||||||
|
|
Loading…
Reference in New Issue