Extend inter-peer cancel tests for "inceptions"
Use new `RemoteActorError` fields in various assertions particularly ensuring that an RTE relayed through the spawner from the little_bro shows up at the client with the right number of entries in the `.relay_path` and that the error is raised in the client as desired in the original use case from `modden`'s remote spawn spawn request API (which was kinda the whole original motivation to finally get all this multi-actor error relay stuff workin). Case extensions: - RTE relayed from little_bro through spawner to client when `raise_sub_spawn_error_after` is set; in this case test should raise the relayed and RAE boxed RTE right up to the `trio.run()`. -> ensure the `rae.src_uid`, `.relay_uid` are set correctly. -> ensure ctx cancels are no acked. - use `expect_ctxc()` around root's `tell_little_bro()` usage. - do `debug_mode` assertions when enabled by test harness in each actor layer. - obvi use new `.src_type`/`.boxed_type` for final error propagation assertions.remote_inceptions
							parent
							
								
									5bf550b64a
								
							
						
					
					
						commit
						91a970091f
					
				|  | @ -16,6 +16,11 @@ from tractor import (  # typing | |||
|     Portal, | ||||
|     Context, | ||||
|     ContextCancelled, | ||||
|     RemoteActorError, | ||||
| ) | ||||
| from tractor._testing import ( | ||||
|     # tractor_test, | ||||
|     expect_ctxc, | ||||
| ) | ||||
| 
 | ||||
| # XXX TODO cases: | ||||
|  | @ -156,10 +161,11 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled( | |||
|             ): | ||||
|                 await trio.sleep_forever() | ||||
| 
 | ||||
|     with pytest.raises(tractor.RemoteActorError) as excinfo: | ||||
|     with pytest.raises(RemoteActorError) as excinfo: | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     assert excinfo.value.type == TypeError | ||||
|     rae = excinfo.value | ||||
|     assert rae.boxed_type == TypeError | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -739,14 +745,16 @@ def test_peer_canceller( | |||
|         with pytest.raises(ContextCancelled) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         assert excinfo.value.type == ContextCancelled | ||||
|         assert excinfo.value.boxed_type == ContextCancelled | ||||
|         assert excinfo.value.canceller[0] == 'canceller' | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def basic_echo_server( | ||||
|     ctx: Context, | ||||
|     peer_name: str = 'stepbro', | ||||
|     peer_name: str = 'wittle_bruv', | ||||
| 
 | ||||
|     err_after: int|None = None, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|  | @ -774,17 +782,31 @@ async def basic_echo_server( | |||
|             # assert 0 | ||||
|             await ipc.send(resp) | ||||
| 
 | ||||
|             if ( | ||||
|                 err_after | ||||
|                 and i > err_after | ||||
|             ): | ||||
|                 raise RuntimeError( | ||||
|                     f'Simulated error in `{peer_name}`' | ||||
|                 ) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def serve_subactors( | ||||
|     ctx: Context, | ||||
|     peer_name: str, | ||||
|     debug_mode: bool, | ||||
| 
 | ||||
| ) -> None: | ||||
|     async with open_nursery() as an: | ||||
| 
 | ||||
|         # sanity | ||||
|         if debug_mode: | ||||
|             assert tractor._state.debug_mode() | ||||
| 
 | ||||
|         await ctx.started(peer_name) | ||||
|         async with ctx.open_stream() as reqs: | ||||
|             async for msg in reqs: | ||||
|         async with ctx.open_stream() as ipc: | ||||
|             async for msg in ipc: | ||||
|                 peer_name: str = msg | ||||
|                 peer: Portal = await an.start_actor( | ||||
|                     name=peer_name, | ||||
|  | @ -795,7 +817,7 @@ async def serve_subactors( | |||
|                     f'{peer_name}\n' | ||||
|                     f'|_{peer}\n' | ||||
|                 ) | ||||
|                 await reqs.send(( | ||||
|                 await ipc.send(( | ||||
|                     peer.chan.uid, | ||||
|                     peer.chan.raddr, | ||||
|                 )) | ||||
|  | @ -807,14 +829,20 @@ async def serve_subactors( | |||
| async def client_req_subactor( | ||||
|     ctx: Context, | ||||
|     peer_name: str, | ||||
|     debug_mode: bool, | ||||
| 
 | ||||
|     # used to simulate a user causing an error to be raised | ||||
|     # directly in thread (like a KBI) to better replicate the | ||||
|     # case where a `modden` CLI client would hang afer requesting | ||||
|     # a `Context.cancel()` to `bigd`'s wks spawner. | ||||
|     reraise_on_cancel: str|None = None, | ||||
|     sub_err_after: int|None = None, | ||||
| 
 | ||||
| ) -> None: | ||||
|     # sanity | ||||
|     if debug_mode: | ||||
|         assert tractor._state.debug_mode() | ||||
| 
 | ||||
|     # TODO: other cases to do with sub lifetimes: | ||||
|     # -[ ] test that we can have the server spawn a sub | ||||
|     #   that lives longer then ctx with this client. | ||||
|  | @ -836,6 +864,7 @@ async def client_req_subactor( | |||
|         spawner.open_context( | ||||
|             serve_subactors, | ||||
|             peer_name=peer_name, | ||||
|             debug_mode=debug_mode, | ||||
|         ) as (spawner_ctx, first), | ||||
|     ): | ||||
|         assert first == peer_name | ||||
|  | @ -857,6 +886,7 @@ async def client_req_subactor( | |||
|             await tell_little_bro( | ||||
|                 actor_name=sub_uid[0], | ||||
|                 caller='client', | ||||
|                 err_after=sub_err_after, | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: test different scope-layers of | ||||
|  | @ -868,9 +898,7 @@ async def client_req_subactor( | |||
|             # TODO: would be super nice to have a special injected | ||||
|             # cancel type here (maybe just our ctxc) but using | ||||
|             # some native mechanism in `trio` :p | ||||
|             except ( | ||||
|                 trio.Cancelled | ||||
|             ) as err: | ||||
|             except trio.Cancelled as err: | ||||
|                 _err = err | ||||
|                 if reraise_on_cancel: | ||||
|                     errtype = globals()['__builtins__'][reraise_on_cancel] | ||||
|  | @ -897,7 +925,9 @@ async def client_req_subactor( | |||
| 
 | ||||
| async def tell_little_bro( | ||||
|     actor_name: str, | ||||
|     caller: str = '' | ||||
| 
 | ||||
|     caller: str = '', | ||||
|     err_after: int|None = None, | ||||
| ): | ||||
|     # contact target actor, do a stream dialog. | ||||
|     async with ( | ||||
|  | @ -906,10 +936,12 @@ async def tell_little_bro( | |||
|         ) as lb, | ||||
|         lb.open_context( | ||||
|             basic_echo_server, | ||||
| 
 | ||||
|             # XXX proxy any delayed err condition | ||||
|             err_after=err_after, | ||||
|         ) as (sub_ctx, first), | ||||
|         sub_ctx.open_stream( | ||||
|             basic_echo_server, | ||||
|         ) as echo_ipc, | ||||
| 
 | ||||
|         sub_ctx.open_stream() as echo_ipc, | ||||
|     ): | ||||
|         actor: Actor = current_actor() | ||||
|         uid: tuple = actor.uid | ||||
|  | @ -936,10 +968,15 @@ async def tell_little_bro( | |||
|     'raise_client_error', | ||||
|     [None, 'KeyboardInterrupt'], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'raise_sub_spawn_error_after', | ||||
|     [None, 50], | ||||
| ) | ||||
| def test_peer_spawns_and_cancels_service_subactor( | ||||
|     debug_mode: bool, | ||||
|     raise_client_error: str, | ||||
|     reg_addr: tuple[str, int], | ||||
|     raise_sub_spawn_error_after: int|None, | ||||
| ): | ||||
|     # NOTE: this tests for the modden `mod wks open piker` bug | ||||
|     # discovered as part of implementing workspace ctx | ||||
|  | @ -953,6 +990,16 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|     #   and the server's spawned child should cancel and terminate! | ||||
|     peer_name: str = 'little_bro' | ||||
| 
 | ||||
|     def check_inner_rte(rae: RemoteActorError): | ||||
|         ''' | ||||
|         Validate the little_bro's relayed inception! | ||||
| 
 | ||||
|         ''' | ||||
|         assert rae.boxed_type is RemoteActorError | ||||
|         assert rae.src_type is RuntimeError | ||||
|         assert 'client' in rae.relay_uid | ||||
|         assert peer_name in rae.src_uid | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             # NOTE: to halt the peer tasks on ctxc, uncomment this. | ||||
|  | @ -976,14 +1023,24 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                     server.open_context( | ||||
|                         serve_subactors, | ||||
|                         peer_name=peer_name, | ||||
|                         debug_mode=debug_mode, | ||||
| 
 | ||||
|                     ) as (spawn_ctx, first), | ||||
| 
 | ||||
|                     client.open_context( | ||||
|                         client_req_subactor, | ||||
|                         peer_name=peer_name, | ||||
|                         debug_mode=debug_mode, | ||||
|                         reraise_on_cancel=raise_client_error, | ||||
| 
 | ||||
|                         # trigger for error condition in sub | ||||
|                         # during streaming. | ||||
|                         sub_err_after=raise_sub_spawn_error_after, | ||||
| 
 | ||||
|                     ) as (client_ctx, client_says), | ||||
|                 ): | ||||
|                     root: Actor = current_actor() | ||||
|                     spawner_uid: tuple = spawn_ctx.chan.uid | ||||
|                     print( | ||||
|                         f'Server says: {first}\n' | ||||
|                         f'Client says: {client_says}\n' | ||||
|  | @ -993,6 +1050,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                     # (grandchild of this root actor) "little_bro" | ||||
|                     # and ensure we can also use it as an echo | ||||
|                     # server. | ||||
|                     sub: Portal | ||||
|                     async with tractor.wait_for_actor( | ||||
|                         name=peer_name, | ||||
|                     ) as sub: | ||||
|  | @ -1004,56 +1062,139 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         f'.uid: {sub.actor.uid}\n' | ||||
|                         f'chan.raddr: {sub.chan.raddr}\n' | ||||
|                     ) | ||||
|                     await tell_little_bro( | ||||
|                         actor_name=peer_name, | ||||
|                         caller='root', | ||||
|                     ) | ||||
| 
 | ||||
|                     # signal client to raise a KBI | ||||
|                     await client_ctx.cancel() | ||||
|                     print('root cancelled client, checking that sub-spawn is down') | ||||
|                     async with expect_ctxc( | ||||
|                         yay=raise_sub_spawn_error_after, | ||||
|                         reraise=False, | ||||
|                     ): | ||||
|                         await tell_little_bro( | ||||
|                             actor_name=peer_name, | ||||
|                             caller='root', | ||||
|                         ) | ||||
| 
 | ||||
|                     async with tractor.find_actor( | ||||
|                         name=peer_name, | ||||
|                     ) as sub: | ||||
|                         assert not sub | ||||
|                     if not raise_sub_spawn_error_after: | ||||
| 
 | ||||
|                     print('root cancelling server/client sub-actors') | ||||
|                         # signal client to cancel and maybe raise a KBI | ||||
|                         await client_ctx.cancel() | ||||
|                         print( | ||||
|                             '-> root cancelling client,\n' | ||||
|                             '-> root checking `client_ctx.result()`,\n' | ||||
|                             f'-> checking that sub-spawn {peer_name} is down\n' | ||||
|                         ) | ||||
|                     # else: | ||||
| 
 | ||||
|                     # await tractor.pause() | ||||
|                     res = await client_ctx.result(hide_tb=False) | ||||
|                     assert isinstance(res, ContextCancelled) | ||||
|                     assert client_ctx.cancel_acked | ||||
|                     assert res.canceller == current_actor().uid | ||||
|                     try: | ||||
|                         res = await client_ctx.result(hide_tb=False) | ||||
| 
 | ||||
|                         # in remote (relayed inception) error | ||||
|                         # case, we should error on the line above! | ||||
|                         if raise_sub_spawn_error_after: | ||||
|                             pytest.fail( | ||||
|                                 'Never rxed proxied `RemoteActorError[RuntimeError]` !?' | ||||
|                             ) | ||||
| 
 | ||||
|                         assert isinstance(res, ContextCancelled) | ||||
|                         assert client_ctx.cancel_acked | ||||
|                         assert res.canceller == root.uid | ||||
| 
 | ||||
|                     except RemoteActorError as rae: | ||||
|                         _err = rae | ||||
|                         assert raise_sub_spawn_error_after | ||||
| 
 | ||||
|                         # since this is a "relayed error" via the client | ||||
|                         # sub-actor, it is expected to be | ||||
|                         # a `RemoteActorError` boxing another | ||||
|                         # `RemoteActorError` otherwise known as | ||||
|                         #  an "inception" (from `trio`'s parlance) | ||||
|                         # ((or maybe a "Matryoshka" and/or "matron" | ||||
|                         # in our own working parlance)) which | ||||
|                         # contains the source error from the | ||||
|                         # little_bro: a `RuntimeError`. | ||||
|                         # | ||||
|                         check_inner_rte(rae) | ||||
|                         assert rae.relay_uid == client.chan.uid | ||||
|                         assert rae.src_uid == sub.chan.uid | ||||
| 
 | ||||
|                         assert not client_ctx.cancel_acked | ||||
|                         assert ( | ||||
|                             client_ctx.maybe_error | ||||
|                             is client_ctx.outcome | ||||
|                             is rae | ||||
|                         ) | ||||
|                         raise | ||||
|                         # await tractor.pause() | ||||
| 
 | ||||
|                     else: | ||||
|                         assert not raise_sub_spawn_error_after | ||||
| 
 | ||||
|                         # cancelling the spawner sub should | ||||
|                         # transitively cancel it's sub, the little | ||||
|                         # bruv. | ||||
|                         print('root cancelling server/client sub-actors') | ||||
|                         await spawn_ctx.cancel() | ||||
|                         async with tractor.find_actor( | ||||
|                             name=peer_name, | ||||
|                         ) as sub: | ||||
|                             assert not sub | ||||
| 
 | ||||
|                     await spawn_ctx.cancel() | ||||
|                     # await server.cancel_actor() | ||||
| 
 | ||||
|             except RemoteActorError as rae: | ||||
|                 # XXX more-or-less same as above handler | ||||
|                 # this is just making sure the error bubbles out | ||||
|                 # of the  | ||||
|                 _err = rae | ||||
|                 assert raise_sub_spawn_error_after | ||||
|                 raise | ||||
| 
 | ||||
|             # since we called `.cancel_actor()`, `.cancel_ack` | ||||
|             # will not be set on the ctx bc `ctx.cancel()` was not | ||||
|             # called directly fot this confext. | ||||
|             except ContextCancelled as ctxc: | ||||
|                 print('caught ctxc from contexts!') | ||||
|                 assert ctxc.canceller == current_actor().uid | ||||
|                 _ctxc = ctxc | ||||
|                 print( | ||||
|                     f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n' | ||||
|                     f'{repr(ctxc)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 if not raise_sub_spawn_error_after: | ||||
|                     assert ctxc.canceller == root.uid | ||||
|                 else: | ||||
|                     assert ctxc.canceller == spawner_uid | ||||
| 
 | ||||
|                 assert ctxc is spawn_ctx.outcome | ||||
|                 assert ctxc is spawn_ctx.maybe_error | ||||
|                 raise | ||||
| 
 | ||||
|             # assert spawn_ctx.cancel_acked | ||||
|             assert spawn_ctx.cancel_acked | ||||
|             assert client_ctx.cancel_acked | ||||
|             if raise_sub_spawn_error_after: | ||||
|                 pytest.fail( | ||||
|                     'context block(s) in PARENT never raised?!?' | ||||
|                 ) | ||||
| 
 | ||||
|             await client.cancel_actor() | ||||
|             await server.cancel_actor() | ||||
|             if not raise_sub_spawn_error_after: | ||||
|                 # assert spawn_ctx.cancel_acked | ||||
|                 assert spawn_ctx.cancel_acked | ||||
|                 assert client_ctx.cancel_acked | ||||
| 
 | ||||
|             # WOA WOA WOA! we need this to close..!!!?? | ||||
|             # that's super bad XD | ||||
|                 await client.cancel_actor() | ||||
|                 await server.cancel_actor() | ||||
| 
 | ||||
|             # TODO: why isn't this working!?!? | ||||
|             # we're now outside the `.open_context()` block so | ||||
|             # the internal `Context._scope: CancelScope` should be | ||||
|             # gracefully "closed" ;) | ||||
|                 # WOA WOA WOA! we need this to close..!!!?? | ||||
|                 # that's super bad XD | ||||
| 
 | ||||
|             # assert spawn_ctx.cancelled_caught | ||||
|                 # TODO: why isn't this working!?!? | ||||
|                 # we're now outside the `.open_context()` block so | ||||
|                 # the internal `Context._scope: CancelScope` should be | ||||
|                 # gracefully "closed" ;) | ||||
| 
 | ||||
|     trio.run(main) | ||||
|                 # assert spawn_ctx.cancelled_caught | ||||
| 
 | ||||
|     if raise_sub_spawn_error_after: | ||||
|         with pytest.raises(RemoteActorError) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         rae: RemoteActorError = excinfo.value | ||||
|         check_inner_rte(rae) | ||||
| 
 | ||||
|     else: | ||||
|         trio.run(main) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue