diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index e3c8a7d..470287f 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -16,6 +16,11 @@ from tractor import ( # typing Portal, Context, ContextCancelled, + RemoteActorError, +) +from tractor._testing import ( + # tractor_test, + expect_ctxc, ) # XXX TODO cases: @@ -156,10 +161,11 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled( ): await trio.sleep_forever() - with pytest.raises(tractor.RemoteActorError) as excinfo: + with pytest.raises(RemoteActorError) as excinfo: trio.run(main) - assert excinfo.value.type == TypeError + rae = excinfo.value + assert rae.boxed_type == TypeError @tractor.context @@ -739,14 +745,16 @@ def test_peer_canceller( with pytest.raises(ContextCancelled) as excinfo: trio.run(main) - assert excinfo.value.type == ContextCancelled + assert excinfo.value.boxed_type == ContextCancelled assert excinfo.value.canceller[0] == 'canceller' @tractor.context async def basic_echo_server( ctx: Context, - peer_name: str = 'stepbro', + peer_name: str = 'wittle_bruv', + + err_after: int|None = None, ) -> None: ''' @@ -774,17 +782,31 @@ async def basic_echo_server( # assert 0 await ipc.send(resp) + if ( + err_after + and i > err_after + ): + raise RuntimeError( + f'Simulated error in `{peer_name}`' + ) + @tractor.context async def serve_subactors( ctx: Context, peer_name: str, + debug_mode: bool, ) -> None: async with open_nursery() as an: + + # sanity + if debug_mode: + assert tractor._state.debug_mode() + await ctx.started(peer_name) - async with ctx.open_stream() as reqs: - async for msg in reqs: + async with ctx.open_stream() as ipc: + async for msg in ipc: peer_name: str = msg peer: Portal = await an.start_actor( name=peer_name, @@ -795,7 +817,7 @@ async def serve_subactors( f'{peer_name}\n' f'|_{peer}\n' ) - await reqs.send(( + await ipc.send(( peer.chan.uid, peer.chan.raddr, )) @@ -807,14 +829,20 @@ async def serve_subactors( async def client_req_subactor( ctx: Context, peer_name: str, + debug_mode: bool, # used to simulate a user causing an error to be raised # directly in thread (like a KBI) to better replicate the # case where a `modden` CLI client would hang afer requesting # a `Context.cancel()` to `bigd`'s wks spawner. reraise_on_cancel: str|None = None, + sub_err_after: int|None = None, ) -> None: + # sanity + if debug_mode: + assert tractor._state.debug_mode() + # TODO: other cases to do with sub lifetimes: # -[ ] test that we can have the server spawn a sub # that lives longer then ctx with this client. @@ -836,6 +864,7 @@ async def client_req_subactor( spawner.open_context( serve_subactors, peer_name=peer_name, + debug_mode=debug_mode, ) as (spawner_ctx, first), ): assert first == peer_name @@ -857,6 +886,7 @@ async def client_req_subactor( await tell_little_bro( actor_name=sub_uid[0], caller='client', + err_after=sub_err_after, ) # TODO: test different scope-layers of @@ -868,9 +898,7 @@ async def client_req_subactor( # TODO: would be super nice to have a special injected # cancel type here (maybe just our ctxc) but using # some native mechanism in `trio` :p - except ( - trio.Cancelled - ) as err: + except trio.Cancelled as err: _err = err if reraise_on_cancel: errtype = globals()['__builtins__'][reraise_on_cancel] @@ -897,7 +925,9 @@ async def client_req_subactor( async def tell_little_bro( actor_name: str, - caller: str = '' + + caller: str = '', + err_after: int|None = None, ): # contact target actor, do a stream dialog. async with ( @@ -906,10 +936,12 @@ async def tell_little_bro( ) as lb, lb.open_context( basic_echo_server, + + # XXX proxy any delayed err condition + err_after=err_after, ) as (sub_ctx, first), - sub_ctx.open_stream( - basic_echo_server, - ) as echo_ipc, + + sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() uid: tuple = actor.uid @@ -936,10 +968,15 @@ async def tell_little_bro( 'raise_client_error', [None, 'KeyboardInterrupt'], ) +@pytest.mark.parametrize( + 'raise_sub_spawn_error_after', + [None, 50], +) def test_peer_spawns_and_cancels_service_subactor( debug_mode: bool, raise_client_error: str, reg_addr: tuple[str, int], + raise_sub_spawn_error_after: int|None, ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -953,6 +990,16 @@ def test_peer_spawns_and_cancels_service_subactor( # and the server's spawned child should cancel and terminate! peer_name: str = 'little_bro' + def check_inner_rte(rae: RemoteActorError): + ''' + Validate the little_bro's relayed inception! + + ''' + assert rae.boxed_type is RemoteActorError + assert rae.src_type is RuntimeError + assert 'client' in rae.relay_uid + assert peer_name in rae.src_uid + async def main(): async with tractor.open_nursery( # NOTE: to halt the peer tasks on ctxc, uncomment this. @@ -976,14 +1023,24 @@ def test_peer_spawns_and_cancels_service_subactor( server.open_context( serve_subactors, peer_name=peer_name, + debug_mode=debug_mode, + ) as (spawn_ctx, first), client.open_context( client_req_subactor, peer_name=peer_name, + debug_mode=debug_mode, reraise_on_cancel=raise_client_error, + + # trigger for error condition in sub + # during streaming. + sub_err_after=raise_sub_spawn_error_after, + ) as (client_ctx, client_says), ): + root: Actor = current_actor() + spawner_uid: tuple = spawn_ctx.chan.uid print( f'Server says: {first}\n' f'Client says: {client_says}\n' @@ -993,6 +1050,7 @@ def test_peer_spawns_and_cancels_service_subactor( # (grandchild of this root actor) "little_bro" # and ensure we can also use it as an echo # server. + sub: Portal async with tractor.wait_for_actor( name=peer_name, ) as sub: @@ -1004,56 +1062,139 @@ def test_peer_spawns_and_cancels_service_subactor( f'.uid: {sub.actor.uid}\n' f'chan.raddr: {sub.chan.raddr}\n' ) - await tell_little_bro( - actor_name=peer_name, - caller='root', - ) - # signal client to raise a KBI - await client_ctx.cancel() - print('root cancelled client, checking that sub-spawn is down') + async with expect_ctxc( + yay=raise_sub_spawn_error_after, + reraise=False, + ): + await tell_little_bro( + actor_name=peer_name, + caller='root', + ) - async with tractor.find_actor( - name=peer_name, - ) as sub: - assert not sub + if not raise_sub_spawn_error_after: - print('root cancelling server/client sub-actors') + # signal client to cancel and maybe raise a KBI + await client_ctx.cancel() + print( + '-> root cancelling client,\n' + '-> root checking `client_ctx.result()`,\n' + f'-> checking that sub-spawn {peer_name} is down\n' + ) + # else: - # await tractor.pause() - res = await client_ctx.result(hide_tb=False) - assert isinstance(res, ContextCancelled) - assert client_ctx.cancel_acked - assert res.canceller == current_actor().uid + try: + res = await client_ctx.result(hide_tb=False) + + # in remote (relayed inception) error + # case, we should error on the line above! + if raise_sub_spawn_error_after: + pytest.fail( + 'Never rxed proxied `RemoteActorError[RuntimeError]` !?' + ) + + assert isinstance(res, ContextCancelled) + assert client_ctx.cancel_acked + assert res.canceller == root.uid + + except RemoteActorError as rae: + _err = rae + assert raise_sub_spawn_error_after + + # since this is a "relayed error" via the client + # sub-actor, it is expected to be + # a `RemoteActorError` boxing another + # `RemoteActorError` otherwise known as + # an "inception" (from `trio`'s parlance) + # ((or maybe a "Matryoshka" and/or "matron" + # in our own working parlance)) which + # contains the source error from the + # little_bro: a `RuntimeError`. + # + check_inner_rte(rae) + assert rae.relay_uid == client.chan.uid + assert rae.src_uid == sub.chan.uid + + assert not client_ctx.cancel_acked + assert ( + client_ctx.maybe_error + is client_ctx.outcome + is rae + ) + raise + # await tractor.pause() + + else: + assert not raise_sub_spawn_error_after + + # cancelling the spawner sub should + # transitively cancel it's sub, the little + # bruv. + print('root cancelling server/client sub-actors') + await spawn_ctx.cancel() + async with tractor.find_actor( + name=peer_name, + ) as sub: + assert not sub - await spawn_ctx.cancel() # await server.cancel_actor() + except RemoteActorError as rae: + # XXX more-or-less same as above handler + # this is just making sure the error bubbles out + # of the + _err = rae + assert raise_sub_spawn_error_after + raise + # since we called `.cancel_actor()`, `.cancel_ack` # will not be set on the ctx bc `ctx.cancel()` was not # called directly fot this confext. except ContextCancelled as ctxc: - print('caught ctxc from contexts!') - assert ctxc.canceller == current_actor().uid + _ctxc = ctxc + print( + f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n' + f'{repr(ctxc)}\n' + ) + + if not raise_sub_spawn_error_after: + assert ctxc.canceller == root.uid + else: + assert ctxc.canceller == spawner_uid + assert ctxc is spawn_ctx.outcome assert ctxc is spawn_ctx.maybe_error raise - # assert spawn_ctx.cancel_acked - assert spawn_ctx.cancel_acked - assert client_ctx.cancel_acked + if raise_sub_spawn_error_after: + pytest.fail( + 'context block(s) in PARENT never raised?!?' + ) - await client.cancel_actor() - await server.cancel_actor() + if not raise_sub_spawn_error_after: + # assert spawn_ctx.cancel_acked + assert spawn_ctx.cancel_acked + assert client_ctx.cancel_acked - # WOA WOA WOA! we need this to close..!!!?? - # that's super bad XD + await client.cancel_actor() + await server.cancel_actor() - # TODO: why isn't this working!?!? - # we're now outside the `.open_context()` block so - # the internal `Context._scope: CancelScope` should be - # gracefully "closed" ;) + # WOA WOA WOA! we need this to close..!!!?? + # that's super bad XD - # assert spawn_ctx.cancelled_caught + # TODO: why isn't this working!?!? + # we're now outside the `.open_context()` block so + # the internal `Context._scope: CancelScope` should be + # gracefully "closed" ;) - trio.run(main) + # assert spawn_ctx.cancelled_caught + + if raise_sub_spawn_error_after: + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + rae: RemoteActorError = excinfo.value + check_inner_rte(rae) + + else: + trio.run(main)