forked from goodboy/tractor
1
0
Fork 0

Merge pull request #333 from goodboy/exceptiongroups

`ExceptiongGroup`s and `trio>=0.22`
dun_unset_current_actor
goodboy 2022-10-14 20:11:26 -04:00 committed by GitHub
commit a0f6668ce8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 476 additions and 271 deletions

View File

@ -27,6 +27,17 @@ async def main():
# retreive results
async with p0.open_stream_from(breakpoint_forever) as stream:
# triggers the first name error
try:
await p1.run(name_error)
except tractor.RemoteActorError as rae:
assert rae.type is NameError
async for i in stream:
# a second time try the failing subactor and this tie
# let error propagate up to the parent/nursery.
await p1.run(name_error)

View File

@ -12,18 +12,31 @@ async def breakpoint_forever():
while True:
await tractor.breakpoint()
# NOTE: if the test never sent 'q'/'quit' commands
# on the pdb repl, without this checkpoint line the
# repl would spin in this actor forever.
# await trio.sleep(0)
async def spawn_until(depth=0):
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
if depth < 1:
# await n.run_in_actor('breakpoint_forever', breakpoint_forever)
await n.run_in_actor(
await n.run_in_actor(breakpoint_forever)
p = await n.run_in_actor(
name_error,
name='name_error'
)
await trio.sleep(0.5)
# rx and propagate error from child
await p.result()
else:
# recusrive call to spawn another process branching layer of
# the tree
depth -= 1
await n.run_in_actor(
spawn_until,
@ -53,6 +66,7 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
# loglevel='cancel',
) as n:
# spawn both actors
@ -67,8 +81,16 @@ async def main():
name='spawner1',
)
# TODO: test this case as well where the parent don't see
# the sub-actor errors by default and instead expect a user
# ctrl-c to kill the root.
with trio.move_on_after(3):
await trio.sleep_forever()
# gah still an issue here.
await portal.result()
# should never get here
await portal1.result()

View File

@ -0,0 +1,25 @@
Add support for ``trio >= 0.22`` and support for the new Python 3.11
``[Base]ExceptionGroup`` from `pep 654`_ via the backported
`exceptiongroup`_ package and some final fixes to the debug mode
subsystem.
This port ended up driving some (hopefully) final fixes to our debugger
subsystem including the solution to all lingering stdstreams locking
race-conditions and deadlock scenarios. This includes extending the
debugger tests suite as well as cancellation and ``asyncio`` mode cases.
Some of the notable details:
- always reverting to the ``trio`` SIGINT handler when leaving debug
mode.
- bypassing child attempts to acquire the debug lock when detected
to be amdist actor-runtime-cancellation.
- allowing the root actor to cancel local but IPC-stale subactor
requests-tasks for the debug lock when in a "no IPC peers" state.
Further we refined our ``ActorNursery`` semantics to be more similar to
``trio`` in the sense that parent task errors are always packed into the
actor-nursery emitted exception group and adjusted all tests and
examples accordingly.
.. _pep 654: https://peps.python.org/pep-0654/#handling-exception-groups
.. _exceptiongroup: https://github.com/python-trio/exceptiongroup

View File

@ -44,9 +44,10 @@ setup(
# trio related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
'trio >= 0.20, < 0.22',
'trio >= 0.22',
'async_generator',
'trio_typing',
'exceptiongroup',
# tooling
'tricycle',

View File

@ -8,6 +8,10 @@ import platform
import time
from itertools import repeat
from exceptiongroup import (
BaseExceptionGroup,
ExceptionGroup,
)
import pytest
import trio
import tractor
@ -56,29 +60,49 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr,
) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)
# get result(s) from main task
try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
"""
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
@ -95,10 +119,10 @@ def test_multierror(arb_addr):
print("Look Maa that first actor failed hard, hehh")
raise
# here we should get a `trio.MultiError` containing exceptions
# here we should get a ``BaseExceptionGroup`` containing exceptions
# from both subactors
with pytest.raises(trio.MultiError):
with pytest.raises(BaseExceptionGroup):
trio.run(main)
@ -107,7 +131,7 @@ def test_multierror(arb_addr):
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
@ -123,10 +147,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
delay=delay
)
with pytest.raises(trio.MultiError) as exc_info:
# with pytest.raises(trio.MultiError) as exc_info:
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(main)
assert exc_info.type == tractor.MultiError
assert exc_info.type == ExceptionGroup
err = exc_info.value
exceptions = err.exceptions
@ -214,8 +239,8 @@ async def test_cancel_infinite_streamer(start_method):
[
# daemon actors sit idle while single task actors error out
(1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None),
(2, tractor.MultiError, AssertionError, (assert_err, {}), None),
(3, tractor.MultiError, AssertionError, (assert_err, {}), None),
(2, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
(3, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
# 1 daemon actor errors out while single task actors sleep forever
(3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}),
@ -226,7 +251,7 @@ async def test_cancel_infinite_streamer(start_method):
(do_nuthin, {}), (assert_err, {'delay': 1}, True)),
# daemon complete quickly delay while single task
# actors error after brief delay
(3, tractor.MultiError, AssertionError,
(3, BaseExceptionGroup, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
],
ids=[
@ -293,7 +318,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err:
if isinstance(err, tractor.MultiError):
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
@ -337,7 +362,7 @@ async def spawn_and_error(breadth, depth) -> None:
@tractor_test
async def test_nested_multierrors(loglevel, start_method):
'''
Test that failed actor sets are wrapped in `trio.MultiError`s. This
Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This
test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees.
@ -365,7 +390,7 @@ async def test_nested_multierrors(loglevel, start_method):
breadth=subactor_breadth,
depth=depth,
)
except trio.MultiError as err:
except BaseExceptionGroup as err:
assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions:
@ -383,10 +408,10 @@ async def test_nested_multierrors(loglevel, start_method):
assert subexc.type in (
tractor.RemoteActorError,
trio.Cancelled,
trio.MultiError
BaseExceptionGroup,
)
elif isinstance(subexc, trio.MultiError):
elif isinstance(subexc, BaseExceptionGroup):
for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,):
@ -394,7 +419,7 @@ async def test_nested_multierrors(loglevel, start_method):
assert type(subsub) in (
trio.Cancelled,
trio.MultiError,
BaseExceptionGroup,
)
else:
assert isinstance(subexc, tractor.RemoteActorError)
@ -406,13 +431,13 @@ async def test_nested_multierrors(loglevel, start_method):
if is_win():
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (
trio.MultiError,
BaseExceptionGroup,
tractor.RemoteActorError
)
else:
assert isinstance(subexc, trio.MultiError)
assert isinstance(subexc, BaseExceptionGroup)
else:
assert subexc.type is trio.MultiError
assert subexc.type is ExceptionGroup
else:
assert subexc.type in (
tractor.RemoteActorError,

View File

@ -10,6 +10,7 @@ TODO:
- wonder if any of it'll work on OS X?
"""
import itertools
from os import path
from typing import Optional
import platform
@ -485,10 +486,12 @@ def test_multi_subactors(
# 2nd name_error failure
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
])
# TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash
# assert_before(child, [
# "Attaching to pdb in crashed actor: ('name_error_1'",
# "NameError",
# ])
if ctlc:
do_ctlc(child)
@ -580,14 +583,14 @@ def test_multi_daemon_subactors(
child.expect(r"\(Pdb\+\+\)")
# there is a race for which subactor will acquire
# the root's tty lock first
before = str(child.before.decode())
# there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash
# message on the first entry.
bp_forever_msg = "Attaching pdb to actor: ('bp_forever'"
name_error_msg = "NameError"
name_error_msg = "NameError: name 'doggypants' is not defined"
before = str(child.before.decode())
if bp_forever_msg in before:
next_msg = name_error_msg
@ -609,9 +612,7 @@ def test_multi_daemon_subactors(
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert next_msg in before
assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
@ -630,31 +631,49 @@ def test_multi_daemon_subactors(
if ctlc:
do_ctlc(child)
# wait for final error in root
while True:
# expect another breakpoint actor entry
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
try:
# root error should be packed as remote error
assert "_exceptions.RemoteActorError: ('name_error'" in before
break
except AssertionError:
assert bp_forever_msg in before
assert_before(child, [bp_forever_msg])
if ctlc:
do_ctlc(child)
# should crash with the 2nd name error (simulates
# a retry) and then the root eventually (boxed) errors
# after 1 or more further bp actor entries.
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [name_error_msg])
# wait for final error in root
# where it crashs with boxed error
while True:
try:
child.sendline('c')
child.expect(pexpect.EOF)
child.expect(r"\(Pdb\+\+\)")
assert_before(
child,
[bp_forever_msg]
)
except AssertionError:
break
except TIMEOUT:
# Failed to exit using continue..?
child.sendline('q')
# child.sendline('c')
# assert_before(
# child.sendline('c')
assert_before(
child,
[
# boxed error raised in root task
"Attaching to pdb in crashed actor: ('root'",
"_exceptions.RemoteActorError: ('name_error'",
]
)
child.sendline('c')
child.expect(pexpect.EOF)
@ -683,7 +702,15 @@ def test_multi_subactors_root_errors(
# continue again to catch 2nd name error from
# actor 'name_error_1' (which is 2nd depth).
child.sendline('c')
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
child.expect(r"\(Pdb\+\+\)")
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
if "Debug lock blocked for ['name_error_1'" not in before:
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
@ -694,9 +721,14 @@ def test_multi_subactors_root_errors(
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
before = str(child.before.decode())
if "Attaching to pdb in crashed actor: ('spawn_error'" in before:
assert_before(child, [
"Attaching to pdb in crashed actor: ('spawn_error'",
# boxed error from previous step
# boxed error from spawner's child
"RemoteActorError: ('name_error_1'",
"NameError",
])
@ -706,26 +738,28 @@ def test_multi_subactors_root_errors(
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# expect a root actor crash
assert_before(child, [
"Attaching to pdb in crashed actor: ('root'",
# boxed error from previous step
"RemoteActorError: ('name_error'",
"NameError",
# error from root actor and root task that created top level nursery
"Attaching to pdb in crashed actor: ('root'",
"AssertionError",
])
# warnings assert we probably don't need
# assert "Cancelling nursery in ('spawn_error'," in before
if ctlc:
do_ctlc(child)
# continue again
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
# error from root actor and root task that created top level nursery
assert "AssertionError" in before
assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'",
# boxed error from previous step
"RemoteActorError: ('name_error'",
"NameError",
"AssertionError",
'assert 0',
])
@has_nested_actors
@ -750,24 +784,31 @@ def test_multi_nested_subactors_error_through_nurseries(
timed_out_early: bool = False
for i in range(12):
for send_char in itertools.cycle(['c', 'q']):
try:
child.expect(r"\(Pdb\+\+\)")
child.sendline('c')
time.sleep(0.1)
child.sendline(send_char)
time.sleep(0.01)
except EOF:
# race conditions on how fast the continue is sent?
print(f"Failed early on {i}?")
timed_out_early = True
break
else:
child.expect(pexpect.EOF)
if not timed_out_early:
before = str(child.before.decode())
assert "NameError" in before
assert_before(child, [
# boxed source errors
"NameError: name 'doggypants' is not defined",
"tractor._exceptions.RemoteActorError: ('name_error'",
"bdb.BdbQuit",
# first level subtrees
"tractor._exceptions.RemoteActorError: ('spawner0'",
# "tractor._exceptions.RemoteActorError: ('spawner1'",
# propagation of errors up through nested subtrees
"tractor._exceptions.RemoteActorError: ('spawn_until_0'",
"tractor._exceptions.RemoteActorError: ('spawn_until_1'",
"tractor._exceptions.RemoteActorError: ('spawn_until_2'",
])
@pytest.mark.timeout(15)

View File

@ -8,6 +8,7 @@ import builtins
import itertools
import importlib
from exceptiongroup import BaseExceptionGroup
import pytest
import trio
import tractor
@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
@tractor.context

View File

@ -11,7 +11,7 @@ from conftest import tractor_test
@pytest.mark.trio
async def test_no_arbitter():
async def test_no_runtime():
"""An arbitter must be established before any nurseries
can be created.
@ -19,7 +19,7 @@ async def test_no_arbitter():
some point?)
"""
with pytest.raises(RuntimeError) :
with tractor.open_nursery():
async with tractor.find_actor('doggy'):
pass

View File

@ -62,7 +62,10 @@ async def test_lifetime_stack_wipes_tmpfile(
)
).result()
except tractor.RemoteActorError:
except (
tractor.RemoteActorError,
tractor.BaseExceptionGroup,
):
pass
# tmp file should have been wiped by

View File

@ -18,7 +18,7 @@
tractor: structured concurrent "actors".
"""
from trio import MultiError
from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._ipc import Channel
@ -62,7 +62,7 @@ __all__ = [
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'MultiError',
'BaseExceptionGroup',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',

View File

@ -25,6 +25,7 @@ import signal
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
Any,
Optional,
Callable,
AsyncIterator,
@ -75,8 +76,12 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger
local_task_in_debug: Optional[str] = None
local_task_in_debug: str | None = None
# NOTE: set by the current task waiting on the root tty lock from
# the CALLER side of the `lock_tty_for_child()` context entry-call
@ -111,13 +116,10 @@ class Lock:
@classmethod
def unshield_sigint(cls):
if cls._orig_sigint_handler is not None:
# restore original sigint handler
signal.signal(
signal.SIGINT,
cls._orig_sigint_handler
)
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates...
signal.signal(signal.SIGINT, cls._trio_handler)
cls._orig_sigint_handler = None
@classmethod
@ -363,7 +365,7 @@ async def wait_for_parent_stdin_hijack(
) as (ctx, val):
log.pdb('locked context')
log.debug('locked context')
assert val == 'Locked'
async with ctx.open_stream() as stream:
@ -382,15 +384,14 @@ async def wait_for_parent_stdin_hijack(
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
log.pdb('unlocked context')
log.debug('exitting child side locking task context')
except ContextCancelled:
log.warning('Root actor cancelled debug lock')
finally:
log.pdb(f"Exiting debugger for actor {actor_uid}")
Lock.local_task_in_debug = None
log.pdb(f"Child {actor_uid} released parent stdio lock")
log.debug('Exiting debugger from child')
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
@ -423,9 +424,8 @@ async def _breakpoint(
'''
__tracebackhide__ = True
pdb, undo_sigint = mk_mpdb()
actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb()
task_name = trio.lowlevel.current_task().name
# TODO: is it possible to debug a trio.Cancelled except block?
@ -449,7 +449,10 @@ async def _breakpoint(
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
if Lock.local_task_in_debug == task_name:
# noop on recurrent entry case
# noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some subactor.
await trio.lowlevel.checkpoint()
return
# if **this** actor is already in debug mode block here
@ -468,10 +471,13 @@ async def _breakpoint(
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# NOTE: if we want to debug a trio.Cancelled triggered exception
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below?
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
try:
with trio.CancelScope(shield=True):
await actor._service_n.start(
@ -480,6 +486,13 @@ async def _breakpoint(
)
except RuntimeError:
Lock.release()
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
return
raise
elif is_root_process():
@ -530,10 +543,6 @@ async def _breakpoint(
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal(
# signal.SIGINT,
# orig_handler
# )
def shield_sigint(
@ -544,7 +553,7 @@ def shield_sigint(
) -> None:
'''
Specialized debugger compatible SIGINT handler.
Specialized, debugger-aware SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
@ -601,6 +610,8 @@ def shield_sigint(
# which has already terminated to unlock.
and any_connected
):
# we are root and some actor is in debug mode
# if uid_in_debug is not None:
name = uid_in_debug[0]
if name != 'root':
log.pdb(
@ -611,6 +622,22 @@ def shield_sigint(
log.pdb(
"Ignoring SIGINT while in debug mode"
)
elif (
is_root_process()
):
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
# revert back to ``trio`` handler asap!
Lock.unshield_sigint()
if (
Lock._root_local_task_cs_in_debug
and not Lock._root_local_task_cs_in_debug.cancel_called
):
Lock._root_local_task_cs_in_debug.cancel()
# raise KeyboardInterrupt
# child actor that has locked the debugger
elif not is_root_process():
@ -636,10 +663,9 @@ def shield_sigint(
# https://github.com/goodboy/tractor/issues/320
# elif debug_mode():
else:
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
else: # XXX: shouldn't ever get here?
print("WTFWTFWTF")
raise KeyboardInterrupt
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it lookks to be that the last command that was run (eg. ll)

View File

@ -27,6 +27,7 @@ import importlib
import builtins
import traceback
import exceptiongroup as eg
import trio
@ -52,9 +53,6 @@ class RemoteActorError(Exception):
self.type = suberror_type
self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating
@ -123,10 +121,12 @@ def unpack_error(
err_type=RemoteActorError
) -> Exception:
"""Unpack an 'error' message from the wire
'''
Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
"""
'''
__tracebackhide__ = True
error = msg['error']
tb_str = error.get('tb_str', '')
@ -139,7 +139,12 @@ def unpack_error(
suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type
for ns in [builtins, _this_mod, trio]:
for ns in [
builtins,
_this_mod,
eg,
trio,
]:
try:
suberror_type = getattr(ns, type_name)
break
@ -158,12 +163,15 @@ def unpack_error(
def is_multi_cancelled(exc: BaseException) -> bool:
"""Predicate to determine if a ``trio.MultiError`` contains only
``trio.Cancelled`` sub-exceptions (and is likely the result of
'''
Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
"""
return not trio.MultiError.filter(
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
exc,
)
'''
if isinstance(exc, eg.BaseExceptionGroup):
return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled)
) is not None
return False

View File

@ -52,17 +52,17 @@ log = get_logger(__name__)
def _unwrap_msg(
msg: dict[str, Any],
channel: Channel
) -> Any:
__tracebackhide__ = True
try:
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel)
raise unpack_error(msg, channel) from None
class MessagingError(Exception):
@ -136,6 +136,7 @@ class Portal:
Return the result(s) from the remote actor's "main" task.
'''
# __tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -460,7 +461,6 @@ class Portal:
# sure it's worth being pedantic:
# Exception,
# trio.Cancelled,
# trio.MultiError,
# KeyboardInterrupt,
) as err:

View File

@ -23,15 +23,22 @@ from functools import partial
import importlib
import logging
import os
import signal
from typing import (
Optional,
)
import typing
import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from ._runtime import Actor, Arbiter, async_main
from ._runtime import (
Actor,
Arbiter,
async_main,
)
from . import _debug
from . import _spawn
from . import _state
@ -74,14 +81,19 @@ async def open_root_actor(
rpc_module_paths: Optional[list] = None,
) -> typing.Any:
"""Async entry point for ``tractor``.
'''
Runtime init entry point for ``tractor``.
"""
'''
# Override the global debugger hook to make it play nice with
# ``trio``, see:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
@ -205,7 +217,10 @@ async def open_root_actor(
try:
yield actor
except (Exception, trio.MultiError) as err:
except (
Exception,
BaseExceptionGroup,
) as err:
entered = await _debug._maybe_enter_pm(err)

View File

@ -25,21 +25,23 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import uuid
import signal
import sys
from typing import (
Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
import uuid
from types import ModuleType
import sys
import os
from contextlib import ExitStack
import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
from async_generator import aclosing
from ._ipc import Channel
from ._streaming import Context
@ -194,7 +196,7 @@ async def _invoke(
res = await coro
await chan.send({'return': res, 'cid': cid})
except trio.MultiError:
except BaseExceptionGroup:
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._error is not None:
@ -266,7 +268,7 @@ async def _invoke(
except (
Exception,
trio.MultiError
BaseExceptionGroup,
) as err:
if not is_multi_cancelled(err):
@ -349,7 +351,7 @@ def _get_mod_abspath(module):
async def try_ship_error_to_parent(
channel: Channel,
err: Union[Exception, trio.MultiError],
err: Union[Exception, BaseExceptionGroup],
) -> None:
with trio.CancelScope(shield=True):
@ -708,6 +710,14 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
@ -721,23 +731,16 @@ class Actor:
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if (
pdb_lock._root_local_task_cs_in_debug
and not pdb_lock._root_local_task_cs_in_debug.cancel_called
db_cs
and not db_cs.cancel_called
):
log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
# pdb_lock._root_local_task_cs_in_debug.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
db_cs.cancel()
# XXX: is this necessary (GC should do it)?
if chan.connected():
@ -1228,6 +1231,10 @@ async def async_main(
and when cancelled effectively cancels the actor.
'''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
try:
@ -1549,7 +1556,10 @@ async def process_messages(
partial(_invoke, actor, cid, chan, func, kwargs),
name=funcname,
)
except (RuntimeError, trio.MultiError):
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
@ -1594,7 +1604,10 @@ async def process_messages(
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err:
except (
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called

View File

@ -31,6 +31,7 @@ from typing import (
)
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
from trio_typing import TaskStatus
@ -139,6 +140,7 @@ async def exhaust_portal(
If the main task is an async generator do our best to consume
what's left of it.
'''
__tracebackhide__ = True
try:
log.debug(f"Waiting on final result from {actor.uid}")
@ -146,8 +148,11 @@ async def exhaust_portal(
# always be established and shutdown using a context manager api
final = await portal.result()
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
except (
Exception,
BaseExceptionGroup,
) as err:
# we reraise in the parent task via a ``BaseExceptionGroup``
return err
except trio.Cancelled as err:
# lol, of course we need this too ;P
@ -175,7 +180,7 @@ async def cancel_on_completion(
'''
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
# an exception group and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result

View File

@ -22,7 +22,6 @@ from typing import (
Optional,
Any,
)
from collections.abc import Mapping
import trio
@ -46,30 +45,6 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
return _current_actor
_conc_name_getters = {
'task': trio.lowlevel.current_task,
'actor': current_actor
}
class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names"
_context_keys = ('task', 'actor')
def __len__(self):
return len(self._context_keys)
def __iter__(self):
return iter(self._context_keys)
def __getitem__(self, key: str) -> str:
try:
return _conc_name_getters[key]().name # type: ignore
except RuntimeError:
# no local actor/task context initialized yet
return f'no {key} context'
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""

View File

@ -18,6 +18,7 @@
``trio`` inspired apis and helpers
"""
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from typing import (
@ -27,8 +28,8 @@ from typing import (
import typing
import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from async_generator import asynccontextmanager
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process
@ -82,7 +83,7 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], Exception],
errors: dict[tuple[str, str], BaseException],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -294,13 +295,17 @@ class ActorNursery:
self._join_procs.set()
@asynccontextmanager
@acm
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
# __tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], Exception] = {}
errors: dict[tuple[str, str], BaseException] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
@ -333,19 +338,17 @@ async def _open_and_supervise_one_cancels_all_nursery(
# after we yield upwards
yield anursery
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime(
f"Waiting on subactors {anursery._children} "
"to complete"
)
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set()
except BaseException as err:
except BaseException as inner_err:
errors[actor.uid] = inner_err
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
@ -362,19 +365,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
etype = type(inner_err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
is_multi_cancelled(inner_err)
):
log.cancel(
f"Nursery for {current_actor().uid} "
@ -382,29 +384,23 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
f"errored with")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery
# TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# the whole ``.run_in_actor()`` API should be built "on top of"
# this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await Portal.run()` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
except (
Exception,
trio.MultiError,
BaseExceptionGroup,
trio.Cancelled
) as err:
@ -436,18 +432,20 @@ async def _open_and_supervise_one_cancels_all_nursery(
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
# use `BaseExceptionGroup` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
tuple(errors.values()),
)
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint
# after nursery exit
# da_nursery scope end - nursery checkpoint
# final exit
@asynccontextmanager
@acm
async def open_nursery(
**kwargs,

View File

@ -18,12 +18,14 @@
Log like a forester!
"""
from collections.abc import Mapping
import sys
import logging
import colorlog # type: ignore
from typing import Optional
from ._state import ActorContextInfo
import trio
from ._state import current_actor
_proj_name: str = 'tractor'
@ -36,7 +38,8 @@ LOG_FORMAT = (
# "{bold_white}{log_color}{asctime}{reset}"
"{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}"
"{thin_white}{actor}, {process}, {task}){reset}{bold_white}{thin_white})"
"{thin_white}{actor_name}[{actor_uid}], "
"{process}, {task}){reset}{bold_white}{thin_white})"
" {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]"
" {log_color}{name}"
" {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}"
@ -136,6 +139,37 @@ class StackLevelAdapter(logging.LoggerAdapter):
)
_conc_name_getters = {
'task': lambda: trio.lowlevel.current_task().name,
'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6],
}
class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names"
_context_keys = (
'task',
'actor',
'actor_name',
'actor_uid',
)
def __len__(self):
return len(self._context_keys)
def __iter__(self):
return iter(self._context_keys)
def __getitem__(self, key: str) -> str:
try:
return _conc_name_getters[key]()
except RuntimeError:
# no local actor/task context initialized yet
return f'no {key} context'
def get_logger(
name: str = None,