From b285db4c58a9406026b964892bf6e6461432aba9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Feb 2021 12:59:43 -0500 Subject: [PATCH 1/6] Factor OCA supervisor into new func --- tractor/_root.py | 5 +- tractor/_trionics.py | 237 +++++++++++++++++++++++-------------------- 2 files changed, 130 insertions(+), 112 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index e48ca98..1206864 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -164,7 +164,10 @@ async def open_root_actor( ) try: yield actor - # result = await main() + + # except BaseException as err: + # breakpoint() + except (Exception, trio.MultiError) as err: logger.exception("Actor crashed:") await _debug._maybe_enter_pm(err) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 490778d..dfe4fba 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -200,6 +200,128 @@ class ActorNursery: self._join_procs.set() +@asynccontextmanager +async def _open_and_supervise_one_cancels_all_nursery( + actor: Actor, +) -> typing.AsyncGenerator[ActorNursery, None]: + + # the collection of errors retreived from spawned sub-actors + errors: Dict[Tuple[str, str], Exception] = {} + + # This is the outermost level "deamon actor" nursery. It is awaited + # **after** the below inner "run in actor nursery". This allows for + # handling errors that are generated by the inner nursery in + # a supervisor strategy **before** blocking indefinitely to wait for + # actors spawned in "daemon mode" (aka started using + # ``ActorNursery.start_actor()``). + + # errors from this daemon actor nursery bubble up to caller + async with trio.open_nursery() as da_nursery: + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, + ria_nursery, + da_nursery, + errors + ) + try: + # spawning of actors happens in the caller's scope + # after we yield upwards + yield anursery + log.debug( + f"Waiting on subactors {anursery._children} " + "to complete" + ) + except BaseException as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. + with trio.CancelScope(shield=True): + etype = type(err) + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( + is_multi_cancelled(err) + ): + log.warning( + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + + # cancel all subactors + await anursery.cancel() + + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise + + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug("Waiting on all subactors to complete") + anursery._join_procs.set() + + # ria_nursery scope end + + # XXX: do we need a `trio.Cancelled` catch here as well? + except (Exception, trio.MultiError, trio.Cancelled) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). + log.warning(f"Nursery cancelling due to {err}") + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + raise + finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + + # use `MultiError` as needed + if len(errors) > 1: + raise trio.MultiError(tuple(errors.values())) + else: + raise list(errors.values())[0] + + # ria_nursery scope end - nursery checkpoint + + # after nursery exit + + @asynccontextmanager async def open_nursery( **kwargs, @@ -234,120 +356,13 @@ async def open_nursery( # mark us for teardown on exit implicit_runtime = True - # the collection of errors retreived from spawned sub-actors - errors: Dict[Tuple[str, str], Exception] = {} - - # This is the outermost level "deamon actor" nursery. It is awaited - # **after** the below inner "run in actor nursery". This allows for - # handling errors that are generated by the inner nursery in - # a supervisor strategy **before** blocking indefinitely to wait for - # actors spawned in "daemon mode" (aka started using - # ``ActorNursery.start_actor()``). try: - async with trio.open_nursery() as da_nursery: - try: - # This is the inner level "run in actor" nursery. It is - # awaited first since actors spawned in this way (using - # ``ActorNusery.run_in_actor()``) are expected to only - # return a single result and then complete (i.e. be canclled - # gracefully). Errors collected from these actors are - # immediately raised for handling by a supervisor strategy. - # As such if the strategy propagates any error(s) upwards - # the above "daemon actor" nursery will be notified. - async with trio.open_nursery() as ria_nursery: - anursery = ActorNursery( - actor, - ria_nursery, - da_nursery, - errors - ) - try: - # spawning of actors happens in the caller's scope - # after we yield upwards - yield anursery - log.debug( - f"Waiting on subactors {anursery._children} " - "to complete" - ) - except BaseException as err: - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - anursery._join_procs.set() - try: - # XXX: hypothetically an error could be - # raised and then a cancel signal shows up - # slightly after in which case the `else:` - # block here might not complete? For now, - # shield both. - with trio.CancelScope(shield=True): - etype = type(err) - if etype in ( - trio.Cancelled, - KeyboardInterrupt - ) or ( - is_multi_cancelled(err) - ): - log.warning( - f"Nursery for {current_actor().uid} " - f"was cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + async with _open_and_supervise_one_cancels_all_nursery( + actor + ) as anursery: - # cancel all subactors - await anursery.cancel() + yield anursery - except trio.MultiError as merr: - # If we receive additional errors while waiting on - # remaining subactors that were cancelled, - # aggregate those errors with the original error - # that triggered this teardown. - if err not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [err]) - else: - raise - - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() - - # ria_nursery scope end - - # XXX: do we need a `trio.Cancelled` catch here as well? - except (Exception, trio.MultiError, trio.Cancelled) as err: - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - log.warning(f"Nursery cancelling due to {err}") - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - raise - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - - # use `MultiError` as needed - if len(errors) > 1: - raise trio.MultiError(tuple(errors.values())) - else: - raise list(errors.values())[0] - - # ria_nursery scope end - nursery checkpoint - - # after nursery exit finally: log.debug("Nursery teardown complete") From 983e66b31b54de881ed1027b117023594b6ea27c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Feb 2021 13:07:22 -0500 Subject: [PATCH 2/6] Add second implicit-runtime-boot branch --- tractor/_trionics.py | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dfe4fba..8815562 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -5,7 +5,6 @@ from functools import partial import multiprocessing as mp from typing import Tuple, List, Dict, Optional import typing -from contextlib import AsyncExitStack import warnings import trio @@ -343,25 +342,31 @@ async def open_nursery( actor = current_actor(err_on_no_runtime=False) - if actor is None and is_main_process(): - - # if we are the parent process start the actor runtime implicitly - log.info("Starting actor runtime!") - root_runtime_stack = AsyncExitStack() - actor = await root_runtime_stack.enter_async_context( - open_root_actor(**kwargs) - ) - assert actor is current_actor() - - # mark us for teardown on exit - implicit_runtime = True - try: - async with _open_and_supervise_one_cancels_all_nursery( - actor - ) as anursery: + if actor is None and is_main_process(): - yield anursery + # if we are the parent process start the actor runtime implicitly + log.info("Starting actor runtime!") + + # mark us for teardown on exit + implicit_runtime = True + + async with open_root_actor(**kwargs) as actor: + assert actor is current_actor() + + async with _open_and_supervise_one_cancels_all_nursery( + actor + ) as anursery: + + yield anursery + + else: # sub-nursery case + + async with _open_and_supervise_one_cancels_all_nursery( + actor + ) as anursery: + + yield anursery finally: log.debug("Nursery teardown complete") @@ -369,4 +374,3 @@ async def open_nursery( # shutdown runtime if it was started if implicit_runtime: log.info("Shutting down actor tree") - await root_runtime_stack.aclose() From 8fabd27dbe7a5b7c15e79aa7d8b93196407e2a3c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Feb 2021 13:07:36 -0500 Subject: [PATCH 3/6] Lint fixes --- tests/test_debugger.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 1f5c6d2..9f62653 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -323,7 +323,6 @@ def test_multi_daemon_subactors(spawn, loglevel): child.expect(pexpect.EOF) - def test_multi_subactors_root_errors(spawn): """Multiple subactors, both erroring and breakpointing as well as a nested subactor erroring. @@ -402,7 +401,10 @@ def test_multi_nested_subactors_error_through_nurseries(spawn): assert "NameError" in before -def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method): +def test_root_nursery_cancels_before_child_releases_tty_lock( + spawn, + start_method +): """Test that when the root sends a cancel message before a nested child has unblocked (which can happen when it has the tty lock and is engaged in pdb) it is indeed cancelled after exiting the debugger. @@ -420,7 +422,6 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method child.sendline('c') - for i in range(4): time.sleep(0.5) try: @@ -440,7 +441,6 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method # race conditions on how fast the continue is sent? break - before = str(child.before.decode()) assert "NameError: name 'doggypants' is not defined" in before From b7b2436bc17967f48bb16e252dd309a471058430 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Feb 2021 06:48:15 -0500 Subject: [PATCH 4/6] Remove tractor run from some debug examples --- examples/debugging/multi_daemon_subactors.py | 11 +++++++---- ...lti_nested_subactors_error_up_through_nurseries.py | 7 +++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index b834a02..eadb4c1 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -17,10 +17,13 @@ async def name_error(): async def main(): """Test breakpoint in a streaming actor. """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + loglevel='error', + ) as n: - p0 = await n.start_actor('bp_forever', rpc_module_paths=[__name__]) - p1 = await n.start_actor('name_error', rpc_module_paths=[__name__]) + p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) + p1 = await n.start_actor('name_error', enable_modules=[__name__]) # retreive results stream = await p0.run(breakpoint_forever) @@ -28,4 +31,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True, loglevel='error') + trio.run(main) diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py index 82b4def..e754599 100644 --- a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -1,3 +1,4 @@ +import trio import tractor @@ -50,7 +51,9 @@ async def main(): └─ python -m tractor._child --uid ('spawn_until_0', 'de918e6d ...) """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=True, + ) as n: # spawn both actors portal = await n.run_in_actor( @@ -70,4 +73,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True) + trio.run(main) From cd636b270ea950107535e97ceb844786b0f1f7e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Feb 2021 13:38:20 -0500 Subject: [PATCH 5/6] Update debug tests to expect 'root' actor name --- tests/test_debugger.py | 8 ++++---- tractor/_root.py | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 9f62653..8f850df 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -82,7 +82,7 @@ def test_root_actor_error(spawn, user_in_out): before = str(child.before.decode()) # make sure expected logging and error arrives - assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert "Attaching to pdb in crashed actor: ('root'" in before assert 'AssertionError' in before # send user command @@ -170,7 +170,7 @@ def test_subactor_error(spawn): before = str(child.before.decode()) # root actor gets debugger engaged - assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert "Attaching to pdb in crashed actor: ('root'" in before # error is a remote error propagated from the subactor assert "RemoteActorError: ('name_error'" in before @@ -276,7 +276,7 @@ def test_multi_subactors(spawn): child.sendline('q') child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) - assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert "Attaching to pdb in crashed actor: ('root'" in before assert "RemoteActorError: ('breakpoint_forever'" in before assert 'bdb.BdbQuit' in before @@ -344,7 +344,7 @@ def test_multi_subactors_root_errors(spawn): before = str(child.before.decode()) # should have come just after priot prompt - assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert "Attaching to pdb in crashed actor: ('root'" in before assert "AssertionError" in before # warnings assert we probably don't need diff --git a/tractor/_root.py b/tractor/_root.py index 1206864..7123eb5 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -165,9 +165,6 @@ async def open_root_actor( try: yield actor - # except BaseException as err: - # breakpoint() - except (Exception, trio.MultiError) as err: logger.exception("Actor crashed:") await _debug._maybe_enter_pm(err) From 47565cfbf3c20a33c6a8f4211019af7a5edadc82 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 25 Feb 2021 08:05:35 -0500 Subject: [PATCH 6/6] Use root as default name from `tractor.run()` --- tests/test_pubsub.py | 2 +- tests/test_rpc.py | 2 +- tractor/_root.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 365dbb9..3fcb45d 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -161,7 +161,7 @@ def test_multi_actor_subs_arbiter_pub( async with tractor.open_nursery() as n: - name = 'arbiter' + name = 'root' if pub_actor == 'streamer': # start the publisher as a daemon diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 95a46ec..2a2b406 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -53,7 +53,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): exposed_mods, funcname, inside_err = to_call subactor_exposed_mods = [] func_defined = globals().get(funcname, False) - subactor_requests_to = 'arbiter' + subactor_requests_to = 'root' remote_err = tractor.RemoteActorError # remote module that fails at import time diff --git a/tractor/_root.py b/tractor/_root.py index 7123eb5..48ea462 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -185,7 +185,7 @@ def run( *args, # runtime kwargs - name: Optional[str] = None, + name: Optional[str] = 'root', arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port,