diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index 6c2d5750..ea5fe005 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -32,7 +32,7 @@ async def main(): try: await p1.run(name_error) except tractor.RemoteActorError as rae: - assert rae.type is NameError + assert rae.boxed_type is NameError async for i in stream: diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 5f73ac6c..52db139f 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -95,6 +95,7 @@ def test_ipc_channel_break_during_stream( mod: ModuleType = import_path( examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', root=examples_dir(), + consider_namespace_packages=False, ) # by def we expect KBI from user after a simulated "hang diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index b8c14af3..18ad3615 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -14,7 +14,7 @@ import tractor from tractor._testing import ( tractor_test, ) -from conftest import no_windows +from .conftest import no_windows def is_win(): @@ -77,7 +77,7 @@ def test_remote_error(reg_addr, args_err): # of this actor nursery. await portal.result() except tractor.RemoteActorError as err: - assert err.type == errtype + assert err.boxed_type == errtype print("Look Maa that actor failed hard, hehh") raise @@ -86,7 +86,7 @@ def test_remote_error(reg_addr, args_err): with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) - assert excinfo.value.type == errtype + assert excinfo.value.boxed_type == errtype else: # the root task will also error on the `.result()` call @@ -96,7 +96,7 @@ def test_remote_error(reg_addr, args_err): # ensure boxed errors for exc in excinfo.value.exceptions: - assert exc.type == errtype + assert exc.boxed_type == errtype def test_multierror(reg_addr): @@ -117,7 +117,7 @@ def test_multierror(reg_addr): try: await portal2.result() except tractor.RemoteActorError as err: - assert err.type == AssertionError + assert err.boxed_type == AssertionError print("Look Maa that first actor failed hard, hehh") raise @@ -169,7 +169,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay): for exc in exceptions: assert isinstance(exc, tractor.RemoteActorError) - assert exc.type == AssertionError + assert exc.boxed_type == AssertionError async def do_nothing(): @@ -310,7 +310,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): await portal.run(func, **kwargs) except tractor.RemoteActorError as err: - assert err.type == err_type + assert err.boxed_type == err_type # we only expect this first error to propogate # (all other daemons are cancelled before they # can be scheduled) @@ -329,11 +329,11 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): assert len(err.exceptions) == num_actors for exc in err.exceptions: if isinstance(exc, tractor.RemoteActorError): - assert exc.type == err_type + assert exc.boxed_type == err_type else: assert isinstance(exc, trio.Cancelled) elif isinstance(err, tractor.RemoteActorError): - assert err.type == err_type + assert err.boxed_type == err_type assert n.cancelled is True assert not n._children @@ -412,7 +412,7 @@ async def test_nested_multierrors(loglevel, start_method): elif isinstance(subexc, tractor.RemoteActorError): # on windows it seems we can't exactly be sure wtf # will happen.. - assert subexc.type in ( + assert subexc.boxed_type in ( tractor.RemoteActorError, trio.Cancelled, BaseExceptionGroup, @@ -422,7 +422,7 @@ async def test_nested_multierrors(loglevel, start_method): for subsub in subexc.exceptions: if subsub in (tractor.RemoteActorError,): - subsub = subsub.type + subsub = subsub.boxed_type assert type(subsub) in ( trio.Cancelled, @@ -437,16 +437,16 @@ async def test_nested_multierrors(loglevel, start_method): # we get back the (sent) cancel signal instead if is_win(): if isinstance(subexc, tractor.RemoteActorError): - assert subexc.type in ( + assert subexc.boxed_type in ( BaseExceptionGroup, tractor.RemoteActorError ) else: assert isinstance(subexc, BaseExceptionGroup) else: - assert subexc.type is ExceptionGroup + assert subexc.boxed_type is ExceptionGroup else: - assert subexc.type in ( + assert subexc.boxed_type in ( tractor.RemoteActorError, trio.Cancelled ) diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 350f939b..21fb3920 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -171,4 +171,4 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio( # verify boxed error err = excinfo.value - assert isinstance(err.type(), NameError) + assert err.boxed_type is NameError diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 40247fd7..121abaa8 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -795,7 +795,7 @@ async def test_callee_cancels_before_started( # raises a special cancel signal except tractor.ContextCancelled as ce: - ce.type == trio.Cancelled + ce.boxed_type == trio.Cancelled # the traceback should be informative assert 'itself' in ce.msgdata['tb_str'] @@ -903,7 +903,7 @@ def test_one_end_stream_not_opened( with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) - assert excinfo.value.type == StreamOverrun + assert excinfo.value.boxed_type == StreamOverrun elif overrunner == 'callee': with pytest.raises(tractor.RemoteActorError) as excinfo: @@ -912,7 +912,7 @@ def test_one_end_stream_not_opened( # TODO: embedded remote errors so that we can verify the source # error? the callee delivers an error which is an overrun # wrapped in a remote actor error. - assert excinfo.value.type == tractor.RemoteActorError + assert excinfo.value.boxed_type == tractor.RemoteActorError else: trio.run(main) @@ -1131,7 +1131,7 @@ def test_maybe_allow_overruns_stream( # NOTE: i tried to isolate to a deterministic case here # based on timeing, but i was kinda wasted, and i don't # think it's sane to catch them.. - assert err.type in ( + assert err.boxed_type in ( tractor.RemoteActorError, StreamOverrun, ) @@ -1139,10 +1139,10 @@ def test_maybe_allow_overruns_stream( elif ( slow_side == 'child' ): - assert err.type == StreamOverrun + assert err.boxed_type == StreamOverrun elif slow_side == 'parent': - assert err.type == tractor.RemoteActorError + assert err.boxed_type == tractor.RemoteActorError assert 'StreamOverrun' in err.msgdata['tb_str'] else: diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 923aa94d..3fcf71f9 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -203,7 +203,7 @@ def ctlc( # XXX: disable pygments highlighting for auto-tests # since some envs (like actions CI) will struggle # the the added color-char encoding.. - from tractor._debug import TractorConfig + from tractor.devx._debug import TractorConfig TractorConfig.use_pygements = False yield use_ctlc @@ -685,7 +685,7 @@ def test_multi_daemon_subactors( # now the root actor won't clobber the bp_forever child # during it's first access to the debug lock, but will instead # wait for the lock to release, by the edge triggered - # ``_debug.Lock.no_remote_has_tty`` event before sending cancel messages + # ``devx._debug.Lock.no_remote_has_tty`` event before sending cancel messages # (via portals) to its underlings B) # at some point here there should have been some warning msg from diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 7a923343..63ad07a2 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -20,7 +20,7 @@ from tractor._testing import ( def run_example_in_subproc( loglevel: str, testdir, - arb_addr: tuple[str, int], + reg_addr: tuple[str, int], ): @contextmanager diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 568708a2..5ac463ea 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -128,7 +128,7 @@ def test_aio_simple_error(reg_addr): assert err assert isinstance(err, RemoteActorError) - assert err.type == AssertionError + assert err.boxed_type == AssertionError def test_tractor_cancels_aio(reg_addr): @@ -272,7 +272,7 @@ def test_context_spawns_aio_task_that_errors( err = excinfo.value assert isinstance(err, expect) - assert err.type == AssertionError + assert err.boxed_type == AssertionError async def aio_cancel(): @@ -314,7 +314,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): assert err # ensure boxed error is correct - assert err.type == to_asyncio.AsyncioCancelled + assert err.boxed_type == to_asyncio.AsyncioCancelled # TODO: verify open_channel_from will fail on this.. @@ -466,7 +466,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr): # ensure boxed errors for exc in excinfo.value.exceptions: - assert exc.type == Exception + assert exc.boxed_type == Exception def test_trio_closes_early_and_channel_exits(reg_addr): @@ -500,7 +500,7 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr): # ensure boxed errors for exc in excinfo.value.exceptions: - assert exc.type == Exception + assert exc.boxed_type == Exception @tractor.context diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index e3c8a7dd..470287fb 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -16,6 +16,11 @@ from tractor import ( # typing Portal, Context, ContextCancelled, + RemoteActorError, +) +from tractor._testing import ( + # tractor_test, + expect_ctxc, ) # XXX TODO cases: @@ -156,10 +161,11 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled( ): await trio.sleep_forever() - with pytest.raises(tractor.RemoteActorError) as excinfo: + with pytest.raises(RemoteActorError) as excinfo: trio.run(main) - assert excinfo.value.type == TypeError + rae = excinfo.value + assert rae.boxed_type == TypeError @tractor.context @@ -739,14 +745,16 @@ def test_peer_canceller( with pytest.raises(ContextCancelled) as excinfo: trio.run(main) - assert excinfo.value.type == ContextCancelled + assert excinfo.value.boxed_type == ContextCancelled assert excinfo.value.canceller[0] == 'canceller' @tractor.context async def basic_echo_server( ctx: Context, - peer_name: str = 'stepbro', + peer_name: str = 'wittle_bruv', + + err_after: int|None = None, ) -> None: ''' @@ -774,17 +782,31 @@ async def basic_echo_server( # assert 0 await ipc.send(resp) + if ( + err_after + and i > err_after + ): + raise RuntimeError( + f'Simulated error in `{peer_name}`' + ) + @tractor.context async def serve_subactors( ctx: Context, peer_name: str, + debug_mode: bool, ) -> None: async with open_nursery() as an: + + # sanity + if debug_mode: + assert tractor._state.debug_mode() + await ctx.started(peer_name) - async with ctx.open_stream() as reqs: - async for msg in reqs: + async with ctx.open_stream() as ipc: + async for msg in ipc: peer_name: str = msg peer: Portal = await an.start_actor( name=peer_name, @@ -795,7 +817,7 @@ async def serve_subactors( f'{peer_name}\n' f'|_{peer}\n' ) - await reqs.send(( + await ipc.send(( peer.chan.uid, peer.chan.raddr, )) @@ -807,14 +829,20 @@ async def serve_subactors( async def client_req_subactor( ctx: Context, peer_name: str, + debug_mode: bool, # used to simulate a user causing an error to be raised # directly in thread (like a KBI) to better replicate the # case where a `modden` CLI client would hang afer requesting # a `Context.cancel()` to `bigd`'s wks spawner. reraise_on_cancel: str|None = None, + sub_err_after: int|None = None, ) -> None: + # sanity + if debug_mode: + assert tractor._state.debug_mode() + # TODO: other cases to do with sub lifetimes: # -[ ] test that we can have the server spawn a sub # that lives longer then ctx with this client. @@ -836,6 +864,7 @@ async def client_req_subactor( spawner.open_context( serve_subactors, peer_name=peer_name, + debug_mode=debug_mode, ) as (spawner_ctx, first), ): assert first == peer_name @@ -857,6 +886,7 @@ async def client_req_subactor( await tell_little_bro( actor_name=sub_uid[0], caller='client', + err_after=sub_err_after, ) # TODO: test different scope-layers of @@ -868,9 +898,7 @@ async def client_req_subactor( # TODO: would be super nice to have a special injected # cancel type here (maybe just our ctxc) but using # some native mechanism in `trio` :p - except ( - trio.Cancelled - ) as err: + except trio.Cancelled as err: _err = err if reraise_on_cancel: errtype = globals()['__builtins__'][reraise_on_cancel] @@ -897,7 +925,9 @@ async def client_req_subactor( async def tell_little_bro( actor_name: str, - caller: str = '' + + caller: str = '', + err_after: int|None = None, ): # contact target actor, do a stream dialog. async with ( @@ -906,10 +936,12 @@ async def tell_little_bro( ) as lb, lb.open_context( basic_echo_server, + + # XXX proxy any delayed err condition + err_after=err_after, ) as (sub_ctx, first), - sub_ctx.open_stream( - basic_echo_server, - ) as echo_ipc, + + sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() uid: tuple = actor.uid @@ -936,10 +968,15 @@ async def tell_little_bro( 'raise_client_error', [None, 'KeyboardInterrupt'], ) +@pytest.mark.parametrize( + 'raise_sub_spawn_error_after', + [None, 50], +) def test_peer_spawns_and_cancels_service_subactor( debug_mode: bool, raise_client_error: str, reg_addr: tuple[str, int], + raise_sub_spawn_error_after: int|None, ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -953,6 +990,16 @@ def test_peer_spawns_and_cancels_service_subactor( # and the server's spawned child should cancel and terminate! peer_name: str = 'little_bro' + def check_inner_rte(rae: RemoteActorError): + ''' + Validate the little_bro's relayed inception! + + ''' + assert rae.boxed_type is RemoteActorError + assert rae.src_type is RuntimeError + assert 'client' in rae.relay_uid + assert peer_name in rae.src_uid + async def main(): async with tractor.open_nursery( # NOTE: to halt the peer tasks on ctxc, uncomment this. @@ -976,14 +1023,24 @@ def test_peer_spawns_and_cancels_service_subactor( server.open_context( serve_subactors, peer_name=peer_name, + debug_mode=debug_mode, + ) as (spawn_ctx, first), client.open_context( client_req_subactor, peer_name=peer_name, + debug_mode=debug_mode, reraise_on_cancel=raise_client_error, + + # trigger for error condition in sub + # during streaming. + sub_err_after=raise_sub_spawn_error_after, + ) as (client_ctx, client_says), ): + root: Actor = current_actor() + spawner_uid: tuple = spawn_ctx.chan.uid print( f'Server says: {first}\n' f'Client says: {client_says}\n' @@ -993,6 +1050,7 @@ def test_peer_spawns_and_cancels_service_subactor( # (grandchild of this root actor) "little_bro" # and ensure we can also use it as an echo # server. + sub: Portal async with tractor.wait_for_actor( name=peer_name, ) as sub: @@ -1004,56 +1062,139 @@ def test_peer_spawns_and_cancels_service_subactor( f'.uid: {sub.actor.uid}\n' f'chan.raddr: {sub.chan.raddr}\n' ) - await tell_little_bro( - actor_name=peer_name, - caller='root', - ) - # signal client to raise a KBI - await client_ctx.cancel() - print('root cancelled client, checking that sub-spawn is down') + async with expect_ctxc( + yay=raise_sub_spawn_error_after, + reraise=False, + ): + await tell_little_bro( + actor_name=peer_name, + caller='root', + ) - async with tractor.find_actor( - name=peer_name, - ) as sub: - assert not sub + if not raise_sub_spawn_error_after: - print('root cancelling server/client sub-actors') + # signal client to cancel and maybe raise a KBI + await client_ctx.cancel() + print( + '-> root cancelling client,\n' + '-> root checking `client_ctx.result()`,\n' + f'-> checking that sub-spawn {peer_name} is down\n' + ) + # else: - # await tractor.pause() - res = await client_ctx.result(hide_tb=False) - assert isinstance(res, ContextCancelled) - assert client_ctx.cancel_acked - assert res.canceller == current_actor().uid + try: + res = await client_ctx.result(hide_tb=False) + + # in remote (relayed inception) error + # case, we should error on the line above! + if raise_sub_spawn_error_after: + pytest.fail( + 'Never rxed proxied `RemoteActorError[RuntimeError]` !?' + ) + + assert isinstance(res, ContextCancelled) + assert client_ctx.cancel_acked + assert res.canceller == root.uid + + except RemoteActorError as rae: + _err = rae + assert raise_sub_spawn_error_after + + # since this is a "relayed error" via the client + # sub-actor, it is expected to be + # a `RemoteActorError` boxing another + # `RemoteActorError` otherwise known as + # an "inception" (from `trio`'s parlance) + # ((or maybe a "Matryoshka" and/or "matron" + # in our own working parlance)) which + # contains the source error from the + # little_bro: a `RuntimeError`. + # + check_inner_rte(rae) + assert rae.relay_uid == client.chan.uid + assert rae.src_uid == sub.chan.uid + + assert not client_ctx.cancel_acked + assert ( + client_ctx.maybe_error + is client_ctx.outcome + is rae + ) + raise + # await tractor.pause() + + else: + assert not raise_sub_spawn_error_after + + # cancelling the spawner sub should + # transitively cancel it's sub, the little + # bruv. + print('root cancelling server/client sub-actors') + await spawn_ctx.cancel() + async with tractor.find_actor( + name=peer_name, + ) as sub: + assert not sub - await spawn_ctx.cancel() # await server.cancel_actor() + except RemoteActorError as rae: + # XXX more-or-less same as above handler + # this is just making sure the error bubbles out + # of the + _err = rae + assert raise_sub_spawn_error_after + raise + # since we called `.cancel_actor()`, `.cancel_ack` # will not be set on the ctx bc `ctx.cancel()` was not # called directly fot this confext. except ContextCancelled as ctxc: - print('caught ctxc from contexts!') - assert ctxc.canceller == current_actor().uid + _ctxc = ctxc + print( + f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n' + f'{repr(ctxc)}\n' + ) + + if not raise_sub_spawn_error_after: + assert ctxc.canceller == root.uid + else: + assert ctxc.canceller == spawner_uid + assert ctxc is spawn_ctx.outcome assert ctxc is spawn_ctx.maybe_error raise - # assert spawn_ctx.cancel_acked - assert spawn_ctx.cancel_acked - assert client_ctx.cancel_acked + if raise_sub_spawn_error_after: + pytest.fail( + 'context block(s) in PARENT never raised?!?' + ) - await client.cancel_actor() - await server.cancel_actor() + if not raise_sub_spawn_error_after: + # assert spawn_ctx.cancel_acked + assert spawn_ctx.cancel_acked + assert client_ctx.cancel_acked - # WOA WOA WOA! we need this to close..!!!?? - # that's super bad XD + await client.cancel_actor() + await server.cancel_actor() - # TODO: why isn't this working!?!? - # we're now outside the `.open_context()` block so - # the internal `Context._scope: CancelScope` should be - # gracefully "closed" ;) + # WOA WOA WOA! we need this to close..!!!?? + # that's super bad XD - # assert spawn_ctx.cancelled_caught + # TODO: why isn't this working!?!? + # we're now outside the `.open_context()` block so + # the internal `Context._scope: CancelScope` should be + # gracefully "closed" ;) - trio.run(main) + # assert spawn_ctx.cancelled_caught + + if raise_sub_spawn_error_after: + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + rae: RemoteActorError = excinfo.value + check_inner_rte(rae) + + else: + trio.run(main) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 71f3258b..9581708f 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -15,9 +15,19 @@ async def sleep_back_actor( func_name, func_defined, exposed_mods, + *, + reg_addr: tuple, ): if actor_name: - async with tractor.find_actor(actor_name) as portal: + async with tractor.find_actor( + actor_name, + # NOTE: must be set manually since + # the subactor doesn't have the reg_addr + # fixture code run in it! + # TODO: maybe we should just set this once in the + # _state mod and derive to all children? + registry_addrs=[reg_addr], + ) as portal: try: await portal.run(__name__, func_name) except tractor.RemoteActorError as err: @@ -26,7 +36,7 @@ async def sleep_back_actor( if not exposed_mods: expect = tractor.ModuleNotExposed - assert err.type is expect + assert err.boxed_type is expect raise else: await trio.sleep(float('inf')) @@ -52,11 +62,17 @@ async def short_sleep(): 'fail_on_syntax', ], ) -def test_rpc_errors(reg_addr, to_call, testdir): - """Test errors when making various RPC requests to an actor +def test_rpc_errors( + reg_addr, + to_call, + testdir, +): + ''' + Test errors when making various RPC requests to an actor that either doesn't have the requested module exposed or doesn't define the named function. - """ + + ''' exposed_mods, funcname, inside_err = to_call subactor_exposed_mods = [] func_defined = globals().get(funcname, False) @@ -84,8 +100,13 @@ def test_rpc_errors(reg_addr, to_call, testdir): # spawn a subactor which calls us back async with tractor.open_nursery( - arbiter_addr=reg_addr, + registry_addrs=[reg_addr], enable_modules=exposed_mods.copy(), + + # NOTE: will halt test in REPL if uncommented, so only + # do that if actually debugging subactor but keep it + # disabled for the test. + # debug_mode=True, ) as n: actor = tractor.current_actor() @@ -102,6 +123,7 @@ def test_rpc_errors(reg_addr, to_call, testdir): exposed_mods=exposed_mods, func_defined=True if func_defined else False, enable_modules=subactor_exposed_mods, + reg_addr=reg_addr, ) def run(): @@ -128,4 +150,4 @@ def test_rpc_errors(reg_addr, to_call, testdir): )) if getattr(value, 'type', None): - assert value.type is inside_err + assert value.boxed_type is inside_err diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 6a4b2988..5995ed2d 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -32,8 +32,7 @@ async def spawn( if actor.is_arbiter: - async with tractor.open_nursery( - ) as nursery: + async with tractor.open_nursery() as nursery: # forks here portal = await nursery.run_in_actor( @@ -55,7 +54,9 @@ async def spawn( return 10 -def test_local_arbiter_subactor_global_state(reg_addr): +def test_local_arbiter_subactor_global_state( + reg_addr, +): result = trio.run( spawn, True, diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 344f0c33..0e1d6d10 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -58,16 +58,44 @@ class InternalError(RuntimeError): ''' _body_fields: list[str] = [ - 'src_actor_uid', + 'boxed_type', + 'src_type', + # TODO: format this better if we're going to include it. + # 'relay_path', + 'src_uid', + + # only in sub-types 'canceller', 'sender', ] _msgdata_keys: list[str] = [ - 'type_str', + 'boxed_type_str', ] + _body_fields +def get_err_type(type_name: str) -> BaseException|None: + ''' + Look up an exception type by name from the set of locally + known namespaces: + + - `builtins` + - `tractor._exceptions` + - `trio` + + ''' + for ns in [ + builtins, + _this_mod, + trio, + ]: + if type_ref := getattr( + ns, + type_name, + False, + ): + return type_ref + # TODO: rename to just `RemoteError`? class RemoteActorError(Exception): @@ -81,13 +109,14 @@ class RemoteActorError(Exception): ''' reprol_fields: list[str] = [ - 'src_actor_uid', + 'src_uid', + 'relay_path', ] def __init__( self, message: str, - suberror_type: Type[BaseException] | None = None, + boxed_type: Type[BaseException]|None = None, **msgdata ) -> None: @@ -101,20 +130,112 @@ class RemoteActorError(Exception): # - .remote_type # also pertains to our long long oustanding issue XD # https://github.com/goodboy/tractor/issues/5 - self.boxed_type: str = suberror_type + # + # TODO: always set ._boxed_type` as `None` by default + # and instead render if from `.boxed_type_str`? + self._boxed_type: BaseException = boxed_type + self._src_type: BaseException|None = None self.msgdata: dict[str, Any] = msgdata - @property - def type(self) -> str: - return self.boxed_type + # TODO: mask out eventually or place in `pack_error()` + # pre-`return` lines? + # sanity on inceptions + if boxed_type is RemoteActorError: + assert self.src_type_str != 'RemoteActorError' + assert self.src_uid not in self.relay_path + + # ensure type-str matches and round-tripping from that + # str results in same error type. + # + # TODO NOTE: this is currently exclusively for the + # `ContextCancelled(boxed_type=trio.Cancelled)` case as is + # used inside `._rpc._invoke()` atm though probably we + # should better emphasize that special (one off?) case + # either by customizing `ContextCancelled.__init__()` or + # through a special factor func? + elif boxed_type: + if not self.msgdata.get('boxed_type_str'): + self.msgdata['boxed_type_str'] = str( + type(boxed_type).__name__ + ) + + assert self.boxed_type_str == self.msgdata['boxed_type_str'] + assert self.boxed_type is boxed_type @property - def type_str(self) -> str: - return str(type(self.boxed_type).__name__) + def src_type_str(self) -> str: + ''' + String-name of the source error's type. + + This should be the same as `.boxed_type_str` when unpacked + at the first relay/hop's receiving actor. + + ''' + return self.msgdata['src_type_str'] @property - def src_actor_uid(self) -> tuple[str, str]|None: - return self.msgdata.get('src_actor_uid') + def src_type(self) -> str: + ''' + Error type raised by original remote faulting actor. + + ''' + if self._src_type is None: + self._src_type = get_err_type( + self.msgdata['src_type_str'] + ) + + return self._src_type + + @property + def boxed_type_str(self) -> str: + ''' + String-name of the (last hop's) boxed error type. + + ''' + return self.msgdata['boxed_type_str'] + + @property + def boxed_type(self) -> str: + ''' + Error type boxed by last actor IPC hop. + + ''' + if self._boxed_type is None: + self._boxed_type = get_err_type( + self.msgdata['boxed_type_str'] + ) + + return self._boxed_type + + @property + def relay_path(self) -> list[tuple]: + ''' + Return the list of actors which consecutively relayed + a boxed `RemoteActorError` the src error up until THIS + actor's hop. + + NOTE: a `list` field with the same name is expected to be + passed/updated in `.msgdata`. + + ''' + return self.msgdata['relay_path'] + + @property + def relay_uid(self) -> tuple[str, str]|None: + return tuple( + self.msgdata['relay_path'][-1] + ) + + @property + def src_uid(self) -> tuple[str, str]|None: + if src_uid := ( + self.msgdata.get('src_uid') + ): + return tuple(src_uid) + # TODO: use path lookup instead? + # return tuple( + # self.msgdata['relay_path'][0] + # ) @property def tb_str( @@ -129,28 +250,56 @@ class RemoteActorError(Exception): return '' + def _mk_fields_str( + self, + fields: list[str], + end_char: str = '\n', + ) -> str: + _repr: str = '' + for key in fields: + val: Any|None = ( + getattr(self, key, None) + or + self.msgdata.get(key) + ) + # TODO: for `.relay_path` on multiline? + # if not isinstance(val, str): + # val_str = pformat(val) + # else: + val_str: str = repr(val) + + if val: + _repr += f'{key}={val_str}{end_char}' + + return _repr + def reprol(self) -> str: ''' Represent this error for "one line" display, like in a field of our `Context.__repr__()` output. ''' - _repr: str = f'{type(self).__name__}(' - for key in self.reprol_fields: - val: Any|None = self.msgdata.get(key) - if val: - _repr += f'{key}={repr(val)} ' - - return _repr + # TODO: use this matryoshka emjoi XD + # => 🪆 + reprol_str: str = f'{type(self).__name__}(' + _repr: str = self._mk_fields_str( + self.reprol_fields, + end_char=' ', + ) + return ( + reprol_str + + + _repr + ) def __repr__(self) -> str: + ''' + Nicely formatted boxed error meta data + traceback. - fields: str = '' - for key in _body_fields: - val: str|None = self.msgdata.get(key) - if val: - fields += f'{key}={val}\n' - + ''' + fields: str = self._mk_fields_str( + _body_fields, + ) fields: str = textwrap.indent( fields, # prefix=' '*2, @@ -165,8 +314,6 @@ class RemoteActorError(Exception): f' ------ - ------\n' f' _|\n' ) - # f'|\n' - # f' |\n' if indent: body: str = textwrap.indent( body, @@ -178,9 +325,47 @@ class RemoteActorError(Exception): ')>' ) - # TODO: local recontruction of remote exception deats + def unwrap( + self, + ) -> BaseException: + ''' + Unpack the inner-most source error from it's original IPC msg data. + + We attempt to reconstruct (as best as we can) the original + `Exception` from as it would have been raised in the + failing actor's remote env. + + ''' + src_type_ref: Type[BaseException] = self.src_type + if not src_type_ref: + raise TypeError( + 'Failed to lookup src error type:\n' + f'{self.src_type_str}' + ) + + # TODO: better tb insertion and all the fancier dunder + # metadata stuff as per `.__context__` etc. and friends: + # https://github.com/python-trio/trio/issues/611 + return src_type_ref(self.tb_str) + + # TODO: local recontruction of nested inception for a given + # "hop" / relay-node in this error's relay_path? + # => so would render a `RAE[RAE[RAE[Exception]]]` instance + # with all inner errors unpacked? + # -[ ] if this is useful shouldn't be too hard to impl right? # def unbox(self) -> BaseException: - # ... + # ''' + # Unbox to the prior relays (aka last boxing actor's) + # inner error. + + # ''' + # if not self.relay_path: + # return self.unwrap() + + # # TODO.. + # # return self.boxed_type( + # # boxed_type=get_type_ref(.. + # raise NotImplementedError class InternalActorError(RemoteActorError): @@ -232,7 +417,7 @@ class ContextCancelled(RemoteActorError): f'{self}' ) - # to make `.__repr__()` work uniformly + # TODO: to make `.__repr__()` work uniformly? # src_actor_uid = canceller @@ -283,7 +468,8 @@ class MessagingError(Exception): def pack_error( - exc: BaseException, + exc: BaseException|RemoteActorError, + tb: str|None = None, cid: str|None = None, @@ -300,27 +486,56 @@ def pack_error( else: tb_str = traceback.format_exc() - error_msg: dict[ + error_msg: dict[ # for IPC str, str | tuple[str, str] - ] = { - 'tb_str': tb_str, - 'type_str': type(exc).__name__, - 'boxed_type': type(exc).__name__, - 'src_actor_uid': current_actor().uid, - } + ] = {} + our_uid: tuple = current_actor().uid - # TODO: ?just wholesale proxy `.msgdata: dict`? - # XXX WARNING, when i swapped these ctx-semantics - # tests started hanging..???!!!??? - # if msgdata := exc.getattr('msgdata', {}): - # error_msg.update(msgdata) if ( - isinstance(exc, ContextCancelled) - or isinstance(exc, StreamOverrun) + isinstance(exc, RemoteActorError) ): error_msg.update(exc.msgdata) + # an onion/inception we need to pack + if ( + type(exc) is RemoteActorError + and (boxed := exc.boxed_type) + and boxed != RemoteActorError + ): + # sanity on source error (if needed when tweaking this) + assert (src_type := exc.src_type) != RemoteActorError + assert error_msg['src_type_str'] != 'RemoteActorError' + assert error_msg['src_type_str'] == src_type.__name__ + assert error_msg['src_uid'] != our_uid + + # set the boxed type to be another boxed type thus + # creating an "inception" when unpacked by + # `unpack_error()` in another actor who gets "relayed" + # this error Bo + # + # NOTE on WHY: since we are re-boxing and already + # boxed src error, we want to overwrite the original + # `boxed_type_str` and instead set it to the type of + # the input `exc` type. + error_msg['boxed_type_str'] = 'RemoteActorError' + + else: + error_msg['src_uid'] = our_uid + error_msg['src_type_str'] = type(exc).__name__ + error_msg['boxed_type_str'] = type(exc).__name__ + + # XXX alawys append us the last relay in error propagation path + error_msg.setdefault( + 'relay_path', + [], + ).append(our_uid) + + # XXX NOTE: always ensure the traceback-str is from the + # locally raised error (**not** the prior relay's boxed + # content's `.msgdata`). + error_msg['tb_str'] = tb_str + pkt: dict = {'error': error_msg} if cid: pkt['cid'] = cid @@ -329,7 +544,6 @@ def pack_error( def unpack_error( - msg: dict[str, Any], chan: Channel|None = None, @@ -357,35 +571,32 @@ def unpack_error( # retrieve the remote error's msg encoded details tb_str: str = error_dict.get('tb_str', '') - message: str = f'{chan.uid}\n' + tb_str - type_name: str = ( - error_dict.get('type_str') - or error_dict['boxed_type'] + message: str = ( + f'{chan.uid}\n' + + + tb_str ) - suberror_type: Type[BaseException] = Exception - if type_name == 'ContextCancelled': + # try to lookup a suitable error type from the local runtime + # env then use it to construct a local instance. + boxed_type_str: str = error_dict['boxed_type_str'] + boxed_type: Type[BaseException] = get_err_type(boxed_type_str) + + if boxed_type_str == 'ContextCancelled': box_type = ContextCancelled - suberror_type = box_type + assert boxed_type is box_type - else: # try to lookup a suitable local error type - for ns in [ - builtins, - _this_mod, - trio, - ]: - if suberror_type := getattr( - ns, - type_name, - False, - ): - break + # TODO: already included by `_this_mod` in else loop right? + # + # we have an inception/onion-error so ensure + # we include the relay_path info and the + # original source error. + elif boxed_type_str == 'RemoteActorError': + assert boxed_type is RemoteActorError + assert len(error_dict['relay_path']) >= 1 exc = box_type( message, - suberror_type=suberror_type, - - # unpack other fields into error type init **error_dict, ) @@ -501,6 +712,11 @@ def _raise_from_no_key_in_msg( # destined for the `Context.result()` call during ctx-exit! stream._eoc: Exception = eoc + # in case there already is some underlying remote error + # that arrived which is probably the source of this stream + # closure + ctx.maybe_raise() + raise eoc from src_err if ( diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 89c97381..e50c80dd 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -268,7 +268,10 @@ async def _errors_relayed_via_ipc( entered_debug = await _debug._maybe_enter_pm(err) if not entered_debug: - log.exception('Actor crashed:\n') + log.exception( + 'RPC task crashed\n' + f'|_{ctx}' + ) # always (try to) ship RPC errors back to caller if is_rpc: @@ -608,7 +611,8 @@ async def _invoke( # other side. ctxc = ContextCancelled( msg, - suberror_type=trio.Cancelled, + boxed_type=trio.Cancelled, + # boxed_type_str='Cancelled', canceller=canceller, ) # assign local error so that the `.outcome` @@ -666,7 +670,7 @@ async def _invoke( f'`{repr(ctx.outcome)}`', ) ) - log.cancel( + log.runtime( f'IPC context terminated with a final {res_type_str}\n\n' f'{ctx}\n' ) @@ -699,12 +703,6 @@ async def try_ship_error_to_remote( # TODO: special tb fmting for ctxc cases? # tb=tb, ) - # NOTE: the src actor should always be packed into the - # error.. but how should we verify this? - # actor: Actor = _state.current_actor() - # assert err_msg['src_actor_uid'] - # if not err_msg['error'].get('src_actor_uid'): - # import pdbp; pdbp.set_trace() await channel.send(msg) # XXX NOTE XXX in SC terms this is one of the worst things