diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index c506de4..6c2d575 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -27,7 +27,18 @@ async def main(): # retreive results async with p0.open_stream_from(breakpoint_forever) as stream: - await p1.run(name_error) + + # triggers the first name error + try: + await p1.run(name_error) + except tractor.RemoteActorError as rae: + assert rae.type is NameError + + async for i in stream: + + # a second time try the failing subactor and this tie + # let error propagate up to the parent/nursery. + await p1.run(name_error) if __name__ == '__main__': diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py index f9adb25..348a5ee 100644 --- a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -12,18 +12,31 @@ async def breakpoint_forever(): while True: await tractor.breakpoint() + # NOTE: if the test never sent 'q'/'quit' commands + # on the pdb repl, without this checkpoint line the + # repl would spin in this actor forever. + # await trio.sleep(0) + async def spawn_until(depth=0): """"A nested nursery that triggers another ``NameError``. """ async with tractor.open_nursery() as n: if depth < 1: - # await n.run_in_actor('breakpoint_forever', breakpoint_forever) - await n.run_in_actor( + + await n.run_in_actor(breakpoint_forever) + + p = await n.run_in_actor( name_error, name='name_error' ) + await trio.sleep(0.5) + # rx and propagate error from child + await p.result() + else: + # recusrive call to spawn another process branching layer of + # the tree depth -= 1 await n.run_in_actor( spawn_until, @@ -53,6 +66,7 @@ async def main(): """ async with tractor.open_nursery( debug_mode=True, + # loglevel='cancel', ) as n: # spawn both actors @@ -67,8 +81,16 @@ async def main(): name='spawner1', ) + # TODO: test this case as well where the parent don't see + # the sub-actor errors by default and instead expect a user + # ctrl-c to kill the root. + with trio.move_on_after(3): + await trio.sleep_forever() + # gah still an issue here. await portal.result() + + # should never get here await portal1.result() diff --git a/nooz/333.feature.rst b/nooz/333.feature.rst new file mode 100644 index 0000000..60934db --- /dev/null +++ b/nooz/333.feature.rst @@ -0,0 +1,25 @@ +Add support for ``trio >= 0.22`` and support for the new Python 3.11 +``[Base]ExceptionGroup`` from `pep 654`_ via the backported +`exceptiongroup`_ package and some final fixes to the debug mode +subsystem. + +This port ended up driving some (hopefully) final fixes to our debugger +subsystem including the solution to all lingering stdstreams locking +race-conditions and deadlock scenarios. This includes extending the +debugger tests suite as well as cancellation and ``asyncio`` mode cases. +Some of the notable details: + +- always reverting to the ``trio`` SIGINT handler when leaving debug + mode. +- bypassing child attempts to acquire the debug lock when detected + to be amdist actor-runtime-cancellation. +- allowing the root actor to cancel local but IPC-stale subactor + requests-tasks for the debug lock when in a "no IPC peers" state. + +Further we refined our ``ActorNursery`` semantics to be more similar to +``trio`` in the sense that parent task errors are always packed into the +actor-nursery emitted exception group and adjusted all tests and +examples accordingly. + +.. _pep 654: https://peps.python.org/pep-0654/#handling-exception-groups +.. _exceptiongroup: https://github.com/python-trio/exceptiongroup diff --git a/setup.py b/setup.py index 7e966ec..0eabc5b 100755 --- a/setup.py +++ b/setup.py @@ -44,9 +44,10 @@ setup( # trio related # proper range spec: # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 - 'trio >= 0.20, < 0.22', + 'trio >= 0.22', 'async_generator', 'trio_typing', + 'exceptiongroup', # tooling 'tricycle', diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 21681f0..657ab8e 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -8,6 +8,10 @@ import platform import time from itertools import repeat +from exceptiongroup import ( + BaseExceptionGroup, + ExceptionGroup, +) import pytest import trio import tractor @@ -56,29 +60,49 @@ def test_remote_error(arb_addr, args_err): arbiter_addr=arb_addr, ) as nursery: + # on a remote type error caused by bad input args + # this should raise directly which means we **don't** get + # an exception group outside the nursery since the error + # here and the far end task error are one in the same? portal = await nursery.run_in_actor( assert_err, name='errorer', **args ) # get result(s) from main task try: + # this means the root actor will also raise a local + # parent task error and thus an eg will propagate out + # of this actor nursery. await portal.result() except tractor.RemoteActorError as err: assert err.type == errtype print("Look Maa that actor failed hard, hehh") raise - with pytest.raises(tractor.RemoteActorError) as excinfo: - trio.run(main) + # ensure boxed errors + if args: + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) - # ensure boxed error is correct - assert excinfo.value.type == errtype + assert excinfo.value.type == errtype + + else: + # the root task will also error on the `.result()` call + # so we expect an error from there AND the child. + with pytest.raises(BaseExceptionGroup) as excinfo: + trio.run(main) + + # ensure boxed errors + for exc in excinfo.value.exceptions: + assert exc.type == errtype def test_multierror(arb_addr): - """Verify we raise a ``trio.MultiError`` out of a nursery where + ''' + Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors. - """ + + ''' async def main(): async with tractor.open_nursery( arbiter_addr=arb_addr, @@ -95,10 +119,10 @@ def test_multierror(arb_addr): print("Look Maa that first actor failed hard, hehh") raise - # here we should get a `trio.MultiError` containing exceptions + # here we should get a ``BaseExceptionGroup`` containing exceptions # from both subactors - with pytest.raises(trio.MultiError): + with pytest.raises(BaseExceptionGroup): trio.run(main) @@ -107,7 +131,7 @@ def test_multierror(arb_addr): 'num_subactors', range(25, 26), ) def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): - """Verify we raise a ``trio.MultiError`` out of a nursery where + """Verify we raise a ``BaseExceptionGroup`` out of a nursery where more then one actor errors and also with a delay before failure to test failure during an ongoing spawning. """ @@ -123,10 +147,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): delay=delay ) - with pytest.raises(trio.MultiError) as exc_info: + # with pytest.raises(trio.MultiError) as exc_info: + with pytest.raises(BaseExceptionGroup) as exc_info: trio.run(main) - assert exc_info.type == tractor.MultiError + assert exc_info.type == ExceptionGroup err = exc_info.value exceptions = err.exceptions @@ -214,8 +239,8 @@ async def test_cancel_infinite_streamer(start_method): [ # daemon actors sit idle while single task actors error out (1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None), - (2, tractor.MultiError, AssertionError, (assert_err, {}), None), - (3, tractor.MultiError, AssertionError, (assert_err, {}), None), + (2, BaseExceptionGroup, AssertionError, (assert_err, {}), None), + (3, BaseExceptionGroup, AssertionError, (assert_err, {}), None), # 1 daemon actor errors out while single task actors sleep forever (3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}), @@ -226,7 +251,7 @@ async def test_cancel_infinite_streamer(start_method): (do_nuthin, {}), (assert_err, {'delay': 1}, True)), # daemon complete quickly delay while single task # actors error after brief delay - (3, tractor.MultiError, AssertionError, + (3, BaseExceptionGroup, AssertionError, (assert_err, {'delay': 1}), (do_nuthin, {}, False)), ], ids=[ @@ -293,7 +318,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): # should error here with a ``RemoteActorError`` or ``MultiError`` except first_err as err: - if isinstance(err, tractor.MultiError): + if isinstance(err, BaseExceptionGroup): assert len(err.exceptions) == num_actors for exc in err.exceptions: if isinstance(exc, tractor.RemoteActorError): @@ -337,7 +362,7 @@ async def spawn_and_error(breadth, depth) -> None: @tractor_test async def test_nested_multierrors(loglevel, start_method): ''' - Test that failed actor sets are wrapped in `trio.MultiError`s. This + Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This test goes only 2 nurseries deep but we should eventually have tests for arbitrary n-depth actor trees. @@ -365,7 +390,7 @@ async def test_nested_multierrors(loglevel, start_method): breadth=subactor_breadth, depth=depth, ) - except trio.MultiError as err: + except BaseExceptionGroup as err: assert len(err.exceptions) == subactor_breadth for subexc in err.exceptions: @@ -383,10 +408,10 @@ async def test_nested_multierrors(loglevel, start_method): assert subexc.type in ( tractor.RemoteActorError, trio.Cancelled, - trio.MultiError + BaseExceptionGroup, ) - elif isinstance(subexc, trio.MultiError): + elif isinstance(subexc, BaseExceptionGroup): for subsub in subexc.exceptions: if subsub in (tractor.RemoteActorError,): @@ -394,7 +419,7 @@ async def test_nested_multierrors(loglevel, start_method): assert type(subsub) in ( trio.Cancelled, - trio.MultiError, + BaseExceptionGroup, ) else: assert isinstance(subexc, tractor.RemoteActorError) @@ -406,13 +431,13 @@ async def test_nested_multierrors(loglevel, start_method): if is_win(): if isinstance(subexc, tractor.RemoteActorError): assert subexc.type in ( - trio.MultiError, + BaseExceptionGroup, tractor.RemoteActorError ) else: - assert isinstance(subexc, trio.MultiError) + assert isinstance(subexc, BaseExceptionGroup) else: - assert subexc.type is trio.MultiError + assert subexc.type is ExceptionGroup else: assert subexc.type in ( tractor.RemoteActorError, diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 47920e3..8704bb1 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -10,6 +10,7 @@ TODO: - wonder if any of it'll work on OS X? """ +import itertools from os import path from typing import Optional import platform @@ -485,10 +486,12 @@ def test_multi_subactors( # 2nd name_error failure child.expect(r"\(Pdb\+\+\)") - assert_before(child, [ - "Attaching to pdb in crashed actor: ('name_error_1'", - "NameError", - ]) + # TODO: will we ever get the race where this crash will show up? + # blocklist strat now prevents this crash + # assert_before(child, [ + # "Attaching to pdb in crashed actor: ('name_error_1'", + # "NameError", + # ]) if ctlc: do_ctlc(child) @@ -580,14 +583,14 @@ def test_multi_daemon_subactors( child.expect(r"\(Pdb\+\+\)") - # there is a race for which subactor will acquire - # the root's tty lock first - - before = str(child.before.decode()) + # there can be a race for which subactor will acquire + # the root's tty lock first so anticipate either crash + # message on the first entry. bp_forever_msg = "Attaching pdb to actor: ('bp_forever'" - name_error_msg = "NameError" + name_error_msg = "NameError: name 'doggypants' is not defined" + before = str(child.before.decode()) if bp_forever_msg in before: next_msg = name_error_msg @@ -609,9 +612,7 @@ def test_multi_daemon_subactors( child.sendline('c') child.expect(r"\(Pdb\+\+\)") - before = str(child.before.decode()) - - assert next_msg in before + assert_before(child, [next_msg]) # XXX: hooray the root clobbering the child here was fixed! # IMO, this demonstrates the true power of SC system design. @@ -630,32 +631,50 @@ def test_multi_daemon_subactors( if ctlc: do_ctlc(child) + # expect another breakpoint actor entry + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + assert_before(child, [bp_forever_msg]) + + if ctlc: + do_ctlc(child) + + # should crash with the 2nd name error (simulates + # a retry) and then the root eventually (boxed) errors + # after 1 or more further bp actor entries. + + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + assert_before(child, [name_error_msg]) + # wait for final error in root + # where it crashs with boxed error while True: - - child.sendline('c') - child.expect(r"\(Pdb\+\+\)") - before = str(child.before.decode()) try: - - # root error should be packed as remote error - assert "_exceptions.RemoteActorError: ('name_error'" in before + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + assert_before( + child, + [bp_forever_msg] + ) + except AssertionError: break - except AssertionError: - assert bp_forever_msg in before + # child.sendline('c') + # assert_before( - if ctlc: - do_ctlc(child) + # child.sendline('c') + assert_before( + child, + [ + # boxed error raised in root task + "Attaching to pdb in crashed actor: ('root'", + "_exceptions.RemoteActorError: ('name_error'", + ] + ) - try: - child.sendline('c') - child.expect(pexpect.EOF) - - except TIMEOUT: - # Failed to exit using continue..? - child.sendline('q') - child.expect(pexpect.EOF) + child.sendline('c') + child.expect(pexpect.EOF) @has_nested_actors @@ -683,49 +702,64 @@ def test_multi_subactors_root_errors( # continue again to catch 2nd name error from # actor 'name_error_1' (which is 2nd depth). child.sendline('c') + + # due to block list strat from #337, this will no longer + # propagate before the root errors and cancels the spawner sub-tree. child.expect(r"\(Pdb\+\+\)") + + # only if the blocking condition doesn't kick in fast enough + before = str(child.before.decode()) + if "Debug lock blocked for ['name_error_1'" not in before: + + assert_before(child, [ + "Attaching to pdb in crashed actor: ('name_error_1'", + "NameError", + ]) + + if ctlc: + do_ctlc(child) + + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + + # check if the spawner crashed or was blocked from debug + # and if this intermediary attached check the boxed error + before = str(child.before.decode()) + if "Attaching to pdb in crashed actor: ('spawn_error'" in before: + + assert_before(child, [ + # boxed error from spawner's child + "RemoteActorError: ('name_error_1'", + "NameError", + ]) + + if ctlc: + do_ctlc(child) + + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + + # expect a root actor crash assert_before(child, [ - "Attaching to pdb in crashed actor: ('name_error_1'", - "NameError", - ]) - - if ctlc: - do_ctlc(child) - - child.sendline('c') - child.expect(r"\(Pdb\+\+\)") - assert_before(child, [ - "Attaching to pdb in crashed actor: ('spawn_error'", - # boxed error from previous step - "RemoteActorError: ('name_error_1'", - "NameError", - ]) - - if ctlc: - do_ctlc(child) - - child.sendline('c') - child.expect(r"\(Pdb\+\+\)") - assert_before(child, [ - "Attaching to pdb in crashed actor: ('root'", - # boxed error from previous step "RemoteActorError: ('name_error'", "NameError", + + # error from root actor and root task that created top level nursery + "Attaching to pdb in crashed actor: ('root'", + "AssertionError", ]) - # warnings assert we probably don't need - # assert "Cancelling nursery in ('spawn_error'," in before - - if ctlc: - do_ctlc(child) - - # continue again child.sendline('c') child.expect(pexpect.EOF) - before = str(child.before.decode()) - # error from root actor and root task that created top level nursery - assert "AssertionError" in before + assert_before(child, [ + # "Attaching to pdb in crashed actor: ('root'", + # boxed error from previous step + "RemoteActorError: ('name_error'", + "NameError", + "AssertionError", + 'assert 0', + ]) @has_nested_actors @@ -750,24 +784,31 @@ def test_multi_nested_subactors_error_through_nurseries( timed_out_early: bool = False - for i in range(12): + for send_char in itertools.cycle(['c', 'q']): try: child.expect(r"\(Pdb\+\+\)") - child.sendline('c') - time.sleep(0.1) + child.sendline(send_char) + time.sleep(0.01) except EOF: - - # race conditions on how fast the continue is sent? - print(f"Failed early on {i}?") - timed_out_early = True break - else: - child.expect(pexpect.EOF) - if not timed_out_early: - before = str(child.before.decode()) - assert "NameError" in before + assert_before(child, [ + + # boxed source errors + "NameError: name 'doggypants' is not defined", + "tractor._exceptions.RemoteActorError: ('name_error'", + "bdb.BdbQuit", + + # first level subtrees + "tractor._exceptions.RemoteActorError: ('spawner0'", + # "tractor._exceptions.RemoteActorError: ('spawner1'", + + # propagation of errors up through nested subtrees + "tractor._exceptions.RemoteActorError: ('spawn_until_0'", + "tractor._exceptions.RemoteActorError: ('spawn_until_1'", + "tractor._exceptions.RemoteActorError: ('spawn_until_2'", + ]) @pytest.mark.timeout(15) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index ac8cbcd..775ee98 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -8,6 +8,7 @@ import builtins import itertools import importlib +from exceptiongroup import BaseExceptionGroup import pytest import trio import tractor @@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr): # should trigger remote actor error await portal.result() - with pytest.raises(RemoteActorError) as excinfo: + with pytest.raises(BaseExceptionGroup) as excinfo: trio.run(main) - # ensure boxed error is correct - assert excinfo.value.type == Exception + # ensure boxed errors + for exc in excinfo.value.exceptions: + assert exc.type == Exception def test_trio_closes_early_and_channel_exits(arb_addr): @@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr): # should trigger remote actor error await portal.result() - with pytest.raises(RemoteActorError) as excinfo: + with pytest.raises(BaseExceptionGroup) as excinfo: trio.run(main) - # ensure boxed error is correct - assert excinfo.value.type == Exception + # ensure boxed errors + for exc in excinfo.value.exceptions: + assert exc.type == Exception @tractor.context diff --git a/tests/test_local.py b/tests/test_local.py index 47a7c43..97a8328 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -11,15 +11,15 @@ from conftest import tractor_test @pytest.mark.trio -async def test_no_arbitter(): +async def test_no_runtime(): """An arbitter must be established before any nurseries can be created. (In other words ``tractor.open_root_actor()`` must be engaged at some point?) """ - with pytest.raises(RuntimeError): - with tractor.open_nursery(): + with pytest.raises(RuntimeError) : + async with tractor.find_actor('doggy'): pass diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 5604d6f..127138c 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -62,7 +62,10 @@ async def test_lifetime_stack_wipes_tmpfile( ) ).result() - except tractor.RemoteActorError: + except ( + tractor.RemoteActorError, + tractor.BaseExceptionGroup, + ): pass # tmp file should have been wiped by diff --git a/tractor/__init__.py b/tractor/__init__.py index c4d40f8..a691df6 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -18,7 +18,7 @@ tractor: structured concurrent "actors". """ -from trio import MultiError +from exceptiongroup import BaseExceptionGroup from ._clustering import open_actor_cluster from ._ipc import Channel @@ -62,7 +62,7 @@ __all__ = [ 'ContextCancelled', 'ModuleNotExposed', 'MsgStream', - 'MultiError', + 'BaseExceptionGroup', 'Portal', 'ReceiveMsgStream', 'RemoteActorError', diff --git a/tractor/_debug.py b/tractor/_debug.py index 751c646..af1bf55 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -25,6 +25,7 @@ import signal from functools import partial from contextlib import asynccontextmanager as acm from typing import ( + Any, Optional, Callable, AsyncIterator, @@ -75,8 +76,12 @@ class Lock: # placeholder for function to set a ``trio.Event`` on debugger exit # pdb_release_hook: Optional[Callable] = None + _trio_handler: Callable[ + [int, Optional[FrameType]], Any + ] | int | None = None + # actor-wide variable pointing to current task name using debugger - local_task_in_debug: Optional[str] = None + local_task_in_debug: str | None = None # NOTE: set by the current task waiting on the root tty lock from # the CALLER side of the `lock_tty_for_child()` context entry-call @@ -105,19 +110,16 @@ class Lock: @classmethod def shield_sigint(cls): cls._orig_sigint_handler = signal.signal( - signal.SIGINT, - shield_sigint, - ) + signal.SIGINT, + shield_sigint, + ) @classmethod def unshield_sigint(cls): - if cls._orig_sigint_handler is not None: - # restore original sigint handler - signal.signal( - signal.SIGINT, - cls._orig_sigint_handler - ) - + # always restore ``trio``'s sigint handler. see notes below in + # the pdb factory about the nightmare that is that code swapping + # out the handler when the repl activates... + signal.signal(signal.SIGINT, cls._trio_handler) cls._orig_sigint_handler = None @classmethod @@ -363,7 +365,7 @@ async def wait_for_parent_stdin_hijack( ) as (ctx, val): - log.pdb('locked context') + log.debug('locked context') assert val == 'Locked' async with ctx.open_stream() as stream: @@ -382,15 +384,14 @@ async def wait_for_parent_stdin_hijack( # sync with callee termination assert await ctx.result() == "pdb_unlock_complete" - log.pdb('unlocked context') + log.debug('exitting child side locking task context') except ContextCancelled: log.warning('Root actor cancelled debug lock') finally: - log.pdb(f"Exiting debugger for actor {actor_uid}") Lock.local_task_in_debug = None - log.pdb(f"Child {actor_uid} released parent stdio lock") + log.debug('Exiting debugger from child') def mk_mpdb() -> tuple[MultiActorPdb, Callable]: @@ -423,9 +424,8 @@ async def _breakpoint( ''' __tracebackhide__ = True - - pdb, undo_sigint = mk_mpdb() actor = tractor.current_actor() + pdb, undo_sigint = mk_mpdb() task_name = trio.lowlevel.current_task().name # TODO: is it possible to debug a trio.Cancelled except block? @@ -449,7 +449,10 @@ async def _breakpoint( # Recurrence entry case: this task already has the lock and # is likely recurrently entering a breakpoint if Lock.local_task_in_debug == task_name: - # noop on recurrent entry case + # noop on recurrent entry case but we want to trigger + # a checkpoint to allow other actors error-propagate and + # potetially avoid infinite re-entries in some subactor. + await trio.lowlevel.checkpoint() return # if **this** actor is already in debug mode block here @@ -468,10 +471,13 @@ async def _breakpoint( # root nursery so that the debugger can continue to run without # being restricted by the scope of a new task nursery. - # NOTE: if we want to debug a trio.Cancelled triggered exception + # TODO: if we want to debug a trio.Cancelled triggered exception # we have to figure out how to avoid having the service nursery - # cancel on this task start? I *think* this works below? - # actor._service_n.cancel_scope.shield = shield + # cancel on this task start? I *think* this works below: + # ```python + # actor._service_n.cancel_scope.shield = shield + # ``` + # but not entirely sure if that's a sane way to implement it? try: with trio.CancelScope(shield=True): await actor._service_n.start( @@ -480,6 +486,13 @@ async def _breakpoint( ) except RuntimeError: Lock.release() + + if actor._cancel_called: + # service nursery won't be usable and we + # don't want to lock up the root either way since + # we're in (the midst of) cancellation. + return + raise elif is_root_process(): @@ -530,10 +543,6 @@ async def _breakpoint( # # last_f = frame.f_back # # last_f.f_globals['__tracebackhide__'] = True # # signal.signal = pdbpp.hideframe(signal.signal) - # signal.signal( - # signal.SIGINT, - # orig_handler - # ) def shield_sigint( @@ -544,7 +553,7 @@ def shield_sigint( ) -> None: ''' - Specialized debugger compatible SIGINT handler. + Specialized, debugger-aware SIGINT handler. In childred we always ignore to avoid deadlocks since cancellation should always be managed by the parent supervising actor. The root @@ -601,6 +610,8 @@ def shield_sigint( # which has already terminated to unlock. and any_connected ): + # we are root and some actor is in debug mode + # if uid_in_debug is not None: name = uid_in_debug[0] if name != 'root': log.pdb( @@ -611,6 +622,22 @@ def shield_sigint( log.pdb( "Ignoring SIGINT while in debug mode" ) + elif ( + is_root_process() + ): + log.pdb( + "Ignoring SIGINT since debug mode is enabled" + ) + + # revert back to ``trio`` handler asap! + Lock.unshield_sigint() + if ( + Lock._root_local_task_cs_in_debug + and not Lock._root_local_task_cs_in_debug.cancel_called + ): + Lock._root_local_task_cs_in_debug.cancel() + + # raise KeyboardInterrupt # child actor that has locked the debugger elif not is_root_process(): @@ -636,10 +663,9 @@ def shield_sigint( # https://github.com/goodboy/tractor/issues/320 # elif debug_mode(): - else: - log.pdb( - "Ignoring SIGINT since debug mode is enabled" - ) + else: # XXX: shouldn't ever get here? + print("WTFWTFWTF") + raise KeyboardInterrupt # NOTE: currently (at least on ``fancycompleter`` 0.9.2) # it lookks to be that the last command that was run (eg. ll) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 9ce59e8..5440cad 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -27,6 +27,7 @@ import importlib import builtins import traceback +import exceptiongroup as eg import trio @@ -52,9 +53,6 @@ class RemoteActorError(Exception): self.type = suberror_type self.msgdata = msgdata - # TODO: a trio.MultiError.catch like context manager - # for catching underlying remote errors of a particular type - class InternalActorError(RemoteActorError): """Remote internal ``tractor`` error indicating @@ -123,10 +121,12 @@ def unpack_error( err_type=RemoteActorError ) -> Exception: - """Unpack an 'error' message from the wire + ''' + Unpack an 'error' message from the wire into a local ``RemoteActorError``. - """ + ''' + __tracebackhide__ = True error = msg['error'] tb_str = error.get('tb_str', '') @@ -139,7 +139,12 @@ def unpack_error( suberror_type = trio.Cancelled else: # try to lookup a suitable local error type - for ns in [builtins, _this_mod, trio]: + for ns in [ + builtins, + _this_mod, + eg, + trio, + ]: try: suberror_type = getattr(ns, type_name) break @@ -158,12 +163,15 @@ def unpack_error( def is_multi_cancelled(exc: BaseException) -> bool: - """Predicate to determine if a ``trio.MultiError`` contains only - ``trio.Cancelled`` sub-exceptions (and is likely the result of + ''' + Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains + only ``trio.Cancelled`` sub-exceptions (and is likely the result of cancelling a collection of subtasks. - """ - return not trio.MultiError.filter( - lambda exc: exc if not isinstance(exc, trio.Cancelled) else None, - exc, - ) + ''' + if isinstance(exc, eg.BaseExceptionGroup): + return exc.subgroup( + lambda exc: isinstance(exc, trio.Cancelled) + ) is not None + + return False diff --git a/tractor/_portal.py b/tractor/_portal.py index de2da45..089b09e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -52,17 +52,17 @@ log = get_logger(__name__) def _unwrap_msg( - msg: dict[str, Any], channel: Channel ) -> Any: + __tracebackhide__ = True try: return msg['return'] except KeyError: # internal error should never get here assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, channel) + raise unpack_error(msg, channel) from None class MessagingError(Exception): @@ -136,6 +136,7 @@ class Portal: Return the result(s) from the remote actor's "main" task. ''' + # __tracebackhide__ = True # Check for non-rpc errors slapped on the # channel for which we always raise exc = self.channel._exc @@ -460,7 +461,6 @@ class Portal: # sure it's worth being pedantic: # Exception, # trio.Cancelled, - # trio.MultiError, # KeyboardInterrupt, ) as err: diff --git a/tractor/_root.py b/tractor/_root.py index 0e5b2aa..cb405f5 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -23,15 +23,22 @@ from functools import partial import importlib import logging import os +import signal from typing import ( Optional, ) import typing import warnings + +from exceptiongroup import BaseExceptionGroup import trio -from ._runtime import Actor, Arbiter, async_main +from ._runtime import ( + Actor, + Arbiter, + async_main, +) from . import _debug from . import _spawn from . import _state @@ -74,14 +81,19 @@ async def open_root_actor( rpc_module_paths: Optional[list] = None, ) -> typing.Any: - """Async entry point for ``tractor``. + ''' + Runtime init entry point for ``tractor``. - """ + ''' # Override the global debugger hook to make it play nice with # ``trio``, see: # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' + # attempt to retreive ``trio``'s sigint handler and stash it + # on our debugger lock state. + _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) + # mark top most level process as root actor _state._runtime_vars['_is_root'] = True @@ -205,7 +217,10 @@ async def open_root_actor( try: yield actor - except (Exception, trio.MultiError) as err: + except ( + Exception, + BaseExceptionGroup, + ) as err: entered = await _debug._maybe_enter_pm(err) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 2ece834..9f8fed0 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -25,21 +25,23 @@ from itertools import chain import importlib import importlib.util import inspect -import uuid +import signal +import sys from typing import ( Any, Optional, Union, TYPE_CHECKING, Callable, ) +import uuid from types import ModuleType -import sys import os from contextlib import ExitStack import warnings +from async_generator import aclosing +from exceptiongroup import BaseExceptionGroup import trio # type: ignore from trio_typing import TaskStatus -from async_generator import aclosing from ._ipc import Channel from ._streaming import Context @@ -194,7 +196,7 @@ async def _invoke( res = await coro await chan.send({'return': res, 'cid': cid}) - except trio.MultiError: + except BaseExceptionGroup: # if a context error was set then likely # thei multierror was raised due to that if ctx._error is not None: @@ -266,7 +268,7 @@ async def _invoke( except ( Exception, - trio.MultiError + BaseExceptionGroup, ) as err: if not is_multi_cancelled(err): @@ -349,7 +351,7 @@ def _get_mod_abspath(module): async def try_ship_error_to_parent( channel: Channel, - err: Union[Exception, trio.MultiError], + err: Union[Exception, BaseExceptionGroup], ) -> None: with trio.CancelScope(shield=True): @@ -708,6 +710,14 @@ class Actor: log.runtime(f"No more channels for {chan.uid}") self._peers.pop(uid, None) + log.runtime(f"Peers is {self._peers}") + + # No more channels to other actors (at all) registered + # as connected. + if not self._peers: + log.runtime("Signalling no more peer channel connections") + self._no_more_peers.set() + # NOTE: block this actor from acquiring the # debugger-TTY-lock since we have no way to know if we # cancelled it and further there is no way to ensure the @@ -721,23 +731,16 @@ class Actor: # if a now stale local task has the TTY lock still # we cancel it to allow servicing other requests for # the lock. + db_cs = pdb_lock._root_local_task_cs_in_debug if ( - pdb_lock._root_local_task_cs_in_debug - and not pdb_lock._root_local_task_cs_in_debug.cancel_called + db_cs + and not db_cs.cancel_called ): log.warning( f'STALE DEBUG LOCK DETECTED FOR {uid}' ) # TODO: figure out why this breaks tests.. - # pdb_lock._root_local_task_cs_in_debug.cancel() - - log.runtime(f"Peers is {self._peers}") - - # No more channels to other actors (at all) registered - # as connected. - if not self._peers: - log.runtime("Signalling no more peer channel connections") - self._no_more_peers.set() + db_cs.cancel() # XXX: is this necessary (GC should do it)? if chan.connected(): @@ -1228,6 +1231,10 @@ async def async_main( and when cancelled effectively cancels the actor. ''' + # attempt to retreive ``trio``'s sigint handler and stash it + # on our debugger lock state. + _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) + registered_with_arbiter = False try: @@ -1549,7 +1556,10 @@ async def process_messages( partial(_invoke, actor, cid, chan, func, kwargs), name=funcname, ) - except (RuntimeError, trio.MultiError): + except ( + RuntimeError, + BaseExceptionGroup, + ): # avoid reporting a benign race condition # during actor runtime teardown. nursery_cancelled_before_task = True @@ -1594,7 +1604,10 @@ async def process_messages( # transport **was** disconnected return True - except (Exception, trio.MultiError) as err: + except ( + Exception, + BaseExceptionGroup, + ) as err: if nursery_cancelled_before_task: sn = actor._service_n assert sn and sn.cancel_scope.cancel_called diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 4a9f118..7380ea0 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -31,6 +31,7 @@ from typing import ( ) from collections.abc import Awaitable +from exceptiongroup import BaseExceptionGroup import trio from trio_typing import TaskStatus @@ -139,6 +140,7 @@ async def exhaust_portal( If the main task is an async generator do our best to consume what's left of it. ''' + __tracebackhide__ = True try: log.debug(f"Waiting on final result from {actor.uid}") @@ -146,8 +148,11 @@ async def exhaust_portal( # always be established and shutdown using a context manager api final = await portal.result() - except (Exception, trio.MultiError) as err: - # we reraise in the parent task via a ``trio.MultiError`` + except ( + Exception, + BaseExceptionGroup, + ) as err: + # we reraise in the parent task via a ``BaseExceptionGroup`` return err except trio.Cancelled as err: # lol, of course we need this too ;P @@ -175,7 +180,7 @@ async def cancel_on_completion( ''' # if this call errors we store the exception for later # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request + # an exception group and we still send out a cancel request result = await exhaust_portal(portal, actor) if isinstance(result, Exception): errors[actor.uid] = result diff --git a/tractor/_state.py b/tractor/_state.py index c0c957b..28fa16e 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -22,7 +22,6 @@ from typing import ( Optional, Any, ) -from collections.abc import Mapping import trio @@ -46,30 +45,6 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # return _current_actor -_conc_name_getters = { - 'task': trio.lowlevel.current_task, - 'actor': current_actor -} - - -class ActorContextInfo(Mapping): - "Dyanmic lookup for local actor and task names" - _context_keys = ('task', 'actor') - - def __len__(self): - return len(self._context_keys) - - def __iter__(self): - return iter(self._context_keys) - - def __getitem__(self, key: str) -> str: - try: - return _conc_name_getters[key]().name # type: ignore - except RuntimeError: - # no local actor/task context initialized yet - return f'no {key} context' - - def is_main_process() -> bool: """Bool determining if this actor is running in the top-most process. """ diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 4708e1e..a41cfd5 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -18,6 +18,7 @@ ``trio`` inspired apis and helpers """ +from contextlib import asynccontextmanager as acm from functools import partial import inspect from typing import ( @@ -27,8 +28,8 @@ from typing import ( import typing import warnings +from exceptiongroup import BaseExceptionGroup import trio -from async_generator import asynccontextmanager from ._debug import maybe_wait_for_debugger from ._state import current_actor, is_main_process @@ -82,7 +83,7 @@ class ActorNursery: actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, - errors: dict[tuple[str, str], Exception], + errors: dict[tuple[str, str], BaseException], ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor @@ -294,13 +295,17 @@ class ActorNursery: self._join_procs.set() -@asynccontextmanager +@acm async def _open_and_supervise_one_cancels_all_nursery( actor: Actor, + ) -> typing.AsyncGenerator[ActorNursery, None]: + # TODO: yay or nay? + # __tracebackhide__ = True + # the collection of errors retreived from spawned sub-actors - errors: dict[tuple[str, str], Exception] = {} + errors: dict[tuple[str, str], BaseException] = {} # This is the outermost level "deamon actor" nursery. It is awaited # **after** the below inner "run in actor nursery". This allows for @@ -333,19 +338,17 @@ async def _open_and_supervise_one_cancels_all_nursery( # after we yield upwards yield anursery + # When we didn't error in the caller's scope, + # signal all process-monitor-tasks to conduct + # the "hard join phase". log.runtime( f"Waiting on subactors {anursery._children} " "to complete" ) - - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - - # signal all process monitor tasks to conduct - # hard join phase. anursery._join_procs.set() - except BaseException as err: + except BaseException as inner_err: + errors[actor.uid] = inner_err # If we error in the root but the debugger is # engaged we don't want to prematurely kill (and @@ -362,49 +365,42 @@ async def _open_and_supervise_one_cancels_all_nursery( # worry more are coming). anursery._join_procs.set() - try: - # XXX: hypothetically an error could be - # raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. - with trio.CancelScope(shield=True): - etype = type(err) - if etype in ( - trio.Cancelled, - KeyboardInterrupt - ) or ( - is_multi_cancelled(err) - ): - log.cancel( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. + with trio.CancelScope(shield=True): + etype = type(inner_err) + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( + is_multi_cancelled(inner_err) + ): + log.cancel( + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with") - # cancel all subactors - await anursery.cancel() + # cancel all subactors + await anursery.cancel() - except trio.MultiError as merr: - # If we receive additional errors while waiting on - # remaining subactors that were cancelled, - # aggregate those errors with the original error - # that triggered this teardown. - if err not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [err]) - else: - raise + # ria_nursery scope end - # ria_nursery scope end - - # XXX: do we need a `trio.Cancelled` catch here as well? - # this is the catch around the ``.run_in_actor()`` nursery + # TODO: this is the handler around the ``.run_in_actor()`` + # nursery. Ideally we can drop this entirely in the future as + # the whole ``.run_in_actor()`` API should be built "on top of" + # this lower level spawn-request-cancel "daemon actor" API where + # a local in-actor task nursery is used with one-to-one task + # + `await Portal.run()` calls and the results/errors are + # handled directly (inline) and errors by the local nursery. except ( - Exception, - trio.MultiError, + BaseExceptionGroup, trio.Cancelled ) as err: @@ -436,18 +432,20 @@ async def _open_and_supervise_one_cancels_all_nursery( with trio.CancelScope(shield=True): await anursery.cancel() - # use `MultiError` as needed + # use `BaseExceptionGroup` as needed if len(errors) > 1: - raise trio.MultiError(tuple(errors.values())) + raise BaseExceptionGroup( + 'tractor.ActorNursery errored with', + tuple(errors.values()), + ) else: raise list(errors.values())[0] - # ria_nursery scope end - nursery checkpoint - - # after nursery exit + # da_nursery scope end - nursery checkpoint + # final exit -@asynccontextmanager +@acm async def open_nursery( **kwargs, diff --git a/tractor/log.py b/tractor/log.py index ba3e29c..4273c9b 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -18,12 +18,14 @@ Log like a forester! """ +from collections.abc import Mapping import sys import logging import colorlog # type: ignore -from typing import Optional -from ._state import ActorContextInfo +import trio + +from ._state import current_actor _proj_name: str = 'tractor' @@ -36,7 +38,8 @@ LOG_FORMAT = ( # "{bold_white}{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}" " {bold_white}{thin_white}({reset}" - "{thin_white}{actor}, {process}, {task}){reset}{bold_white}{thin_white})" + "{thin_white}{actor_name}[{actor_uid}], " + "{process}, {task}){reset}{bold_white}{thin_white})" " {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]" " {log_color}{name}" " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" @@ -136,6 +139,37 @@ class StackLevelAdapter(logging.LoggerAdapter): ) +_conc_name_getters = { + 'task': lambda: trio.lowlevel.current_task().name, + 'actor': lambda: current_actor(), + 'actor_name': lambda: current_actor().name, + 'actor_uid': lambda: current_actor().uid[1][:6], +} + + +class ActorContextInfo(Mapping): + "Dyanmic lookup for local actor and task names" + _context_keys = ( + 'task', + 'actor', + 'actor_name', + 'actor_uid', + ) + + def __len__(self): + return len(self._context_keys) + + def __iter__(self): + return iter(self._context_keys) + + def __getitem__(self, key: str) -> str: + try: + return _conc_name_getters[key]() + except RuntimeError: + # no local actor/task context initialized yet + return f'no {key} context' + + def get_logger( name: str = None,