forked from goodboy/tractor
				
			Update tests for `PldRx` and `Context` changes
Mostly adjustments for the new pld-receiver semantics/shim-layer which results more often in the direct delivery of `RemoteActorError`s from IPC API primitives (like `Portal.result()`) instead of being embedded in an `ExceptionGroup` bundled from an embedded nursery. Tossed usage of the `debug_mode: bool` fixture to a couple problematic tests while i was working on them. Also includes detailed assertion updates to the inter-peer cancellation suite in terms of, - `Context.canceller` state correctly matching the true src actor when expecting a ctxc. - any rxed `ContextCancelled` should instance match the `Context._local/remote_error` as should the `.msgdata` and `._ipc_msg`.remotes/1757153874605917753/main
							parent
							
								
									fded92115a
								
							
						
					
					
						commit
						683288c8db
					
				|  | @ -89,17 +89,30 @@ def test_remote_error(reg_addr, args_err): | |||
|         assert excinfo.value.boxed_type == errtype | ||||
| 
 | ||||
|     else: | ||||
|         # the root task will also error on the `.result()` call | ||||
|         # so we expect an error from there AND the child. | ||||
|         with pytest.raises(BaseExceptionGroup) as excinfo: | ||||
|         # the root task will also error on the `Portal.result()` | ||||
|         # call so we expect an error from there AND the child. | ||||
|         # |_ tho seems like on new `trio` this doesn't always | ||||
|         #    happen? | ||||
|         with pytest.raises(( | ||||
|             BaseExceptionGroup, | ||||
|             tractor.RemoteActorError, | ||||
|         )) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         # ensure boxed errors | ||||
|         for exc in excinfo.value.exceptions: | ||||
|         # ensure boxed errors are `errtype` | ||||
|         err: BaseException = excinfo.value | ||||
|         if isinstance(err, BaseExceptionGroup): | ||||
|             suberrs: list[BaseException] = err.exceptions | ||||
|         else: | ||||
|             suberrs: list[BaseException] = [err] | ||||
| 
 | ||||
|         for exc in suberrs: | ||||
|             assert exc.boxed_type == errtype | ||||
| 
 | ||||
| 
 | ||||
| def test_multierror(reg_addr): | ||||
| def test_multierror( | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
|     ''' | ||||
|     Verify we raise a ``BaseExceptionGroup`` out of a nursery where | ||||
|     more then one actor errors. | ||||
|  |  | |||
|  | @ -444,6 +444,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out): | |||
|                 infect_asyncio=True, | ||||
|                 fan_out=fan_out, | ||||
|             ) | ||||
|             # should raise RAE diectly | ||||
|             await portal.result() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  | @ -461,12 +462,11 @@ def test_trio_error_cancels_intertask_chan(reg_addr): | |||
|             # should trigger remote actor error | ||||
|             await portal.result() | ||||
| 
 | ||||
|     with pytest.raises(BaseExceptionGroup) as excinfo: | ||||
|     with pytest.raises(RemoteActorError) as excinfo: | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     # ensure boxed errors | ||||
|     for exc in excinfo.value.exceptions: | ||||
|         assert exc.boxed_type == Exception | ||||
|     # ensure boxed error type | ||||
|     excinfo.value.boxed_type == Exception | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_closes_early_and_channel_exits(reg_addr): | ||||
|  | @ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr): | |||
|                 exit_early=True, | ||||
|                 infect_asyncio=True, | ||||
|             ) | ||||
|             # should trigger remote actor error | ||||
|             # should raise RAE diectly | ||||
|             await portal.result() | ||||
| 
 | ||||
|     # should be a quiet exit on a simple channel exit | ||||
|  | @ -492,15 +492,17 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr): | |||
|                 aio_raise_err=True, | ||||
|                 infect_asyncio=True, | ||||
|             ) | ||||
|             # should trigger remote actor error | ||||
|             # should trigger RAE directly, not an eg. | ||||
|             await portal.result() | ||||
| 
 | ||||
|     with pytest.raises(BaseExceptionGroup) as excinfo: | ||||
|     with pytest.raises( | ||||
|         # NOTE: bc we directly wait on `Portal.result()` instead | ||||
|         # of capturing it inside the `ActorNursery` machinery. | ||||
|         expected_exception=RemoteActorError, | ||||
|     ) as excinfo: | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     # ensure boxed errors | ||||
|     for exc in excinfo.value.exceptions: | ||||
|         assert exc.boxed_type == Exception | ||||
|     excinfo.value.boxed_type == Exception | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  |  | |||
|  | @ -55,9 +55,10 @@ from tractor._testing import ( | |||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def sleep_forever( | ||||
| async def open_stream_then_sleep_forever( | ||||
|     ctx: Context, | ||||
|     expect_ctxc: bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Sync the context, open a stream then just sleep. | ||||
|  | @ -67,6 +68,10 @@ async def sleep_forever( | |||
|     ''' | ||||
|     try: | ||||
|         await ctx.started() | ||||
| 
 | ||||
|         # NOTE: the below means this child will send a `Stop` | ||||
|         # to it's parent-side task despite that side never | ||||
|         # opening a stream itself. | ||||
|         async with ctx.open_stream(): | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
|  | @ -100,7 +105,7 @@ async def error_before_started( | |||
|     ''' | ||||
|     async with tractor.wait_for_actor('sleeper') as p2: | ||||
|         async with ( | ||||
|             p2.open_context(sleep_forever) as (peer_ctx, first), | ||||
|             p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first), | ||||
|             peer_ctx.open_stream(), | ||||
|         ): | ||||
|             # NOTE: this WAS inside an @acm body but i factored it | ||||
|  | @ -204,9 +209,13 @@ async def stream_ints( | |||
| @tractor.context | ||||
| async def stream_from_peer( | ||||
|     ctx: Context, | ||||
|     debug_mode: bool, | ||||
|     peer_name: str = 'sleeper', | ||||
| ) -> None: | ||||
| 
 | ||||
|     # sanity | ||||
|     assert tractor._state.debug_mode() == debug_mode | ||||
| 
 | ||||
|     peer: Portal | ||||
|     try: | ||||
|         async with ( | ||||
|  | @ -240,26 +249,54 @@ async def stream_from_peer( | |||
|                 assert msg is not None | ||||
|                 print(msg) | ||||
| 
 | ||||
|     # NOTE: cancellation of the (sleeper) peer should always | ||||
|     # cause a `ContextCancelled` raise in this streaming | ||||
|     # actor. | ||||
|     except ContextCancelled as ctxc: | ||||
|         ctxerr = ctxc | ||||
|     # NOTE: cancellation of the (sleeper) peer should always cause | ||||
|     # a `ContextCancelled` raise in this streaming actor. | ||||
|     except ContextCancelled as _ctxc: | ||||
|         ctxc = _ctxc | ||||
| 
 | ||||
|         assert peer_ctx._remote_error is ctxerr | ||||
|         assert peer_ctx._remote_error.msgdata == ctxerr.msgdata | ||||
|         # print("TRYING TO ENTER PAUSSE!!!") | ||||
|         # await tractor.pause(shield=True) | ||||
|         re: ContextCancelled = peer_ctx._remote_error | ||||
| 
 | ||||
|         # XXX YES, bc exact same msg instances | ||||
|         assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg | ||||
|         # XXX YES XXX, remote error should be unpacked only once! | ||||
|         assert ( | ||||
|             re | ||||
|             is | ||||
|             peer_ctx.maybe_error | ||||
|             is | ||||
|             ctxc | ||||
|             is | ||||
|             peer_ctx._local_error | ||||
|         ) | ||||
|         # NOTE: these errors should all match! | ||||
|         #   ------ - ------ | ||||
|         # XXX [2024-05-03] XXX | ||||
|         #   ------ - ------ | ||||
|         # broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()` | ||||
|         # where the `Error()` msg was directly raising the ctxc | ||||
|         # instead of just returning up to the caller inside | ||||
|         # `Context.return()` which would results in a diff instance of | ||||
|         # the same remote error bubbling out above vs what was | ||||
|         # already unpacked and set inside `Context. | ||||
|         assert ( | ||||
|             peer_ctx._remote_error.msgdata | ||||
|             == | ||||
|             ctxc.msgdata | ||||
|         ) | ||||
|         # ^-XXX-^ notice the data is of course the exact same.. so | ||||
|         # the above larger assert makes sense to also always be true! | ||||
| 
 | ||||
|         # XXX NO, bc new one always created for property accesss | ||||
|         assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg | ||||
|         # XXX YES XXX, bc should be exact same msg instances | ||||
|         assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg | ||||
| 
 | ||||
|         # XXX NO XXX, bc new one always created for property accesss | ||||
|         assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg | ||||
| 
 | ||||
|         # the peer ctx is the canceller even though it's canceller | ||||
|         # is the "canceller" XD | ||||
|         assert peer_name in peer_ctx.canceller | ||||
| 
 | ||||
|         assert "canceller" in ctxerr.canceller | ||||
|         assert "canceller" in ctxc.canceller | ||||
| 
 | ||||
|         # caller peer should not be the cancel requester | ||||
|         assert not ctx.cancel_called | ||||
|  | @ -283,12 +320,13 @@ async def stream_from_peer( | |||
| 
 | ||||
|         # TODO / NOTE `.canceller` won't have been set yet | ||||
|         # here because that machinery is inside | ||||
|         # `.open_context().__aexit__()` BUT, if we had | ||||
|         # `Portal.open_context().__aexit__()` BUT, if we had | ||||
|         # a way to know immediately (from the last | ||||
|         # checkpoint) that cancellation was due to | ||||
|         # a remote, we COULD assert this here..see, | ||||
|         # https://github.com/goodboy/tractor/issues/368 | ||||
|         # | ||||
|         # await tractor.pause() | ||||
|         # assert 'canceller' in ctx.canceller | ||||
| 
 | ||||
|         # root/parent actor task should NEVER HAVE cancelled us! | ||||
|  | @ -392,12 +430,13 @@ def test_peer_canceller( | |||
|             try: | ||||
|                 async with ( | ||||
|                     sleeper.open_context( | ||||
|                         sleep_forever, | ||||
|                         open_stream_then_sleep_forever, | ||||
|                         expect_ctxc=True, | ||||
|                     ) as (sleeper_ctx, sent), | ||||
| 
 | ||||
|                     just_caller.open_context( | ||||
|                         stream_from_peer, | ||||
|                         debug_mode=debug_mode, | ||||
|                     ) as (caller_ctx, sent), | ||||
| 
 | ||||
|                     canceller.open_context( | ||||
|  | @ -423,10 +462,11 @@ def test_peer_canceller( | |||
| 
 | ||||
|                     # should always raise since this root task does | ||||
|                     # not request the sleeper cancellation ;) | ||||
|                     except ContextCancelled as ctxerr: | ||||
|                     except ContextCancelled as _ctxc: | ||||
|                         ctxc = _ctxc | ||||
|                         print( | ||||
|                             'CAUGHT REMOTE CONTEXT CANCEL\n\n' | ||||
|                             f'{ctxerr}\n' | ||||
|                             f'{ctxc}\n' | ||||
|                         ) | ||||
| 
 | ||||
|                         # canceller and caller peers should not | ||||
|  | @ -437,7 +477,7 @@ def test_peer_canceller( | |||
|                         # we were not the actor, our peer was | ||||
|                         assert not sleeper_ctx.cancel_acked | ||||
| 
 | ||||
|                         assert ctxerr.canceller[0] == 'canceller' | ||||
|                         assert ctxc.canceller[0] == 'canceller' | ||||
| 
 | ||||
|                         # XXX NOTE XXX: since THIS `ContextCancelled` | ||||
|                         # HAS NOT YET bubbled up to the | ||||
|  | @ -448,7 +488,7 @@ def test_peer_canceller( | |||
| 
 | ||||
|                         # CASE_1: error-during-ctxc-handling, | ||||
|                         if error_during_ctxerr_handling: | ||||
|                             raise RuntimeError('Simulated error during teardown') | ||||
|                             raise RuntimeError('Simulated RTE re-raise during ctxc handling') | ||||
| 
 | ||||
|                         # CASE_2: standard teardown inside in `.open_context()` block | ||||
|                         raise | ||||
|  | @ -513,6 +553,9 @@ def test_peer_canceller( | |||
|                 #   should be cancelled by US. | ||||
|                 # | ||||
|                 if error_during_ctxerr_handling: | ||||
|                     print(f'loc_err: {_loc_err}\n') | ||||
|                     assert isinstance(loc_err, RuntimeError) | ||||
| 
 | ||||
|                     # since we do a rte reraise above, the | ||||
|                     # `.open_context()` error handling should have | ||||
|                     # raised a local rte, thus the internal | ||||
|  | @ -521,9 +564,6 @@ def test_peer_canceller( | |||
|                     # a `trio.Cancelled` due to a local | ||||
|                     # `._scope.cancel()` call. | ||||
|                     assert not sleeper_ctx._scope.cancelled_caught | ||||
| 
 | ||||
|                     assert isinstance(loc_err, RuntimeError) | ||||
|                     print(f'_loc_err: {_loc_err}\n') | ||||
|                     # assert sleeper_ctx._local_error is _loc_err | ||||
|                     # assert sleeper_ctx._local_error is _loc_err | ||||
|                     assert not ( | ||||
|  | @ -560,9 +600,12 @@ def test_peer_canceller( | |||
| 
 | ||||
|                         else:  # the other 2 ctxs | ||||
|                             assert ( | ||||
|                                 re.canceller | ||||
|                                 == | ||||
|                                 canceller.channel.uid | ||||
|                                 isinstance(re, ContextCancelled) | ||||
|                                 and ( | ||||
|                                     re.canceller | ||||
|                                     == | ||||
|                                     canceller.channel.uid | ||||
|                                 ) | ||||
|                             ) | ||||
| 
 | ||||
|                     # since the sleeper errors while handling a | ||||
|  | @ -811,8 +854,7 @@ async def serve_subactors( | |||
|     async with open_nursery() as an: | ||||
| 
 | ||||
|         # sanity | ||||
|         if debug_mode: | ||||
|             assert tractor._state.debug_mode() | ||||
|         assert tractor._state.debug_mode() == debug_mode | ||||
| 
 | ||||
|         await ctx.started(peer_name) | ||||
|         async with ctx.open_stream() as ipc: | ||||
|  | @ -1091,7 +1133,6 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                             '-> root checking `client_ctx.result()`,\n' | ||||
|                             f'-> checking that sub-spawn {peer_name} is down\n' | ||||
|                         ) | ||||
|                     # else: | ||||
| 
 | ||||
|                     try: | ||||
|                         res = await client_ctx.result(hide_tb=False) | ||||
|  |  | |||
|  | @ -2,7 +2,9 @@ | |||
| Spawning basics | ||||
| 
 | ||||
| """ | ||||
| from typing import Optional | ||||
| from typing import ( | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| import pytest | ||||
| import trio | ||||
|  | @ -25,13 +27,11 @@ async def spawn( | |||
|     async with tractor.open_root_actor( | ||||
|         arbiter_addr=reg_addr, | ||||
|     ): | ||||
| 
 | ||||
|         actor = tractor.current_actor() | ||||
|         assert actor.is_arbiter == is_arbiter | ||||
|         data = data_to_pass_down | ||||
| 
 | ||||
|         if actor.is_arbiter: | ||||
| 
 | ||||
|             async with tractor.open_nursery() as nursery: | ||||
| 
 | ||||
|                 # forks here | ||||
|  | @ -95,7 +95,9 @@ async def test_movie_theatre_convo(start_method): | |||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| async def cellar_door(return_value: Optional[str]): | ||||
| async def cellar_door( | ||||
|     return_value: str|None, | ||||
| ): | ||||
|     return return_value | ||||
| 
 | ||||
| 
 | ||||
|  | @ -105,16 +107,18 @@ async def cellar_door(return_value: Optional[str]): | |||
| ) | ||||
| @tractor_test | ||||
| async def test_most_beautiful_word( | ||||
|     start_method, | ||||
|     return_value | ||||
|     start_method: str, | ||||
|     return_value: Any, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     The main ``tractor`` routine. | ||||
| 
 | ||||
|     ''' | ||||
|     with trio.fail_after(1): | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         async with tractor.open_nursery( | ||||
|             debug_mode=debug_mode, | ||||
|         ) as n: | ||||
|             portal = await n.run_in_actor( | ||||
|                 cellar_door, | ||||
|                 return_value=return_value, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue