forked from goodboy/tractor
				
			Add an inter-leaved-task error test
Trying to replicate cases where errors are raised in both `trio` and `asyncio` tasks independently (at least in `.to_asyncio` API terms) with a new `test_trio_prestarted_task_bubbles` that generates 3 cases inside a `@acm` calls stack composing a `trio.Nursery` with a `to_asyncio.open_channel_from()` call where a set of `trio` tasks are started in a loop using `.start()` with various exc raising sequences, - the aio task raising *before* the last `trio` task spawns. - the aio task raising just after the last trio task spawns, but before it starts. - after the last trio task `.start()` call returns control to the parent - but (for now) did not error. TODO, still more cases to discover as i'm still fighting a `modden` bug of this sort atm.. Other, - tweak some other tests to have timeouts since some recent hangs were found.. - started mucking with py3.13 and thus adjustments for strict egs in some tests; full patchset to test suite likely coming soon!remotes/1757153874605917753/main
							parent
							
								
									d4f1a02f43
								
							
						
					
					
						commit
						3c8b1aa888
					
				|  | @ -109,7 +109,9 @@ async def asyncio_actor( | |||
| 
 | ||||
|     except BaseException as err: | ||||
|         if expect_err: | ||||
|             assert isinstance(err, error_type) | ||||
|             assert isinstance(err, error_type), ( | ||||
|                 f'{type(err)} is not {error_type}?' | ||||
|             ) | ||||
| 
 | ||||
|         raise | ||||
| 
 | ||||
|  | @ -181,8 +183,8 @@ def test_trio_cancels_aio(reg_addr): | |||
|         with trio.move_on_after(1): | ||||
|             # cancel the nursery shortly after boot | ||||
| 
 | ||||
|             async with tractor.open_nursery() as n: | ||||
|                 await n.run_in_actor( | ||||
|             async with tractor.open_nursery() as tn: | ||||
|                 await tn.run_in_actor( | ||||
|                     asyncio_actor, | ||||
|                     target='aio_sleep_forever', | ||||
|                     expect_err='trio.Cancelled', | ||||
|  | @ -202,22 +204,33 @@ async def trio_ctx( | |||
|     # this will block until the ``asyncio`` task sends a "first" | ||||
|     # message. | ||||
|     with trio.fail_after(2): | ||||
|         async with ( | ||||
|             trio.open_nursery() as n, | ||||
|         try: | ||||
|             async with ( | ||||
|                 trio.open_nursery( | ||||
|                     # TODO, for new `trio` / py3.13 | ||||
|                     # strict_exception_groups=False, | ||||
|                 ) as tn, | ||||
|                 tractor.to_asyncio.open_channel_from( | ||||
|                     sleep_and_err, | ||||
|                 ) as (first, chan), | ||||
|             ): | ||||
| 
 | ||||
|             tractor.to_asyncio.open_channel_from( | ||||
|                 sleep_and_err, | ||||
|             ) as (first, chan), | ||||
|         ): | ||||
|                 assert first == 'start' | ||||
| 
 | ||||
|             assert first == 'start' | ||||
|                 # spawn another asyncio task for the cuck of it. | ||||
|                 tn.start_soon( | ||||
|                     tractor.to_asyncio.run_task, | ||||
|                     aio_sleep_forever, | ||||
|                 ) | ||||
|                 await trio.sleep_forever() | ||||
| 
 | ||||
|             # spawn another asyncio task for the cuck of it. | ||||
|             n.start_soon( | ||||
|                 tractor.to_asyncio.run_task, | ||||
|                 aio_sleep_forever, | ||||
|             ) | ||||
|             await trio.sleep_forever() | ||||
|         # TODO, factor this into a `trionics.callapse()`? | ||||
|         except* BaseException as beg: | ||||
|             # await tractor.pause(shield=True) | ||||
|             if len(excs := beg.exceptions) == 1: | ||||
|                 raise excs[0] | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|  | @ -236,7 +249,6 @@ def test_context_spawns_aio_task_that_errors( | |||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
| 
 | ||||
|         with trio.fail_after(2): | ||||
|             async with tractor.open_nursery() as n: | ||||
|                 p = await n.start_actor( | ||||
|  | @ -308,7 +320,9 @@ async def aio_cancel(): | |||
|     await aio_sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | ||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled( | ||||
|     reg_addr: tuple, | ||||
| ): | ||||
|     ''' | ||||
|     When the `asyncio.Task` cancels itself the `trio` side cshould | ||||
|     also cancel and teardown and relay the cancellation cross-process | ||||
|  | @ -405,6 +419,7 @@ async def stream_from_aio( | |||
|             sequence=seq, | ||||
|             expect_cancel=raise_err or exit_early, | ||||
|             fail_early=aio_raise_err, | ||||
| 
 | ||||
|         ) as (first, chan): | ||||
| 
 | ||||
|             assert first is True | ||||
|  | @ -423,10 +438,15 @@ async def stream_from_aio( | |||
|                         if raise_err: | ||||
|                             raise Exception | ||||
|                         elif exit_early: | ||||
|                             print('`consume()` breaking early!\n') | ||||
|                             break | ||||
| 
 | ||||
|                 print('returning from `consume()`..\n') | ||||
| 
 | ||||
|             # run 2 tasks each pulling from | ||||
|             # the inter-task-channel with the 2nd | ||||
|             # using a fan-out `BroadcastReceiver`. | ||||
|             if fan_out: | ||||
|                 # start second task that get's the same stream value set. | ||||
|                 async with ( | ||||
| 
 | ||||
|                     # NOTE: this has to come first to avoid | ||||
|  | @ -436,11 +456,19 @@ async def stream_from_aio( | |||
| 
 | ||||
|                     trio.open_nursery() as n, | ||||
|                 ): | ||||
|                     # start 2nd task that get's broadcast the same | ||||
|                     # value set. | ||||
|                     n.start_soon(consume, br) | ||||
|                     await consume(chan) | ||||
| 
 | ||||
|             else: | ||||
|                 await consume(chan) | ||||
|     except BaseException as err: | ||||
|         import logging | ||||
|         log = logging.getLogger() | ||||
|         log.exception('aio-subactor errored!\n') | ||||
|         raise err | ||||
| 
 | ||||
|     finally: | ||||
| 
 | ||||
|         if ( | ||||
|  | @ -461,7 +489,8 @@ async def stream_from_aio( | |||
|             assert not fan_out | ||||
|             assert pulled == expect[:51] | ||||
| 
 | ||||
|         print('trio guest mode task completed!') | ||||
|         print('trio guest-mode task completed!') | ||||
|         assert chan._aio_task.done() | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|  | @ -501,19 +530,37 @@ def test_trio_error_cancels_intertask_chan(reg_addr): | |||
|     excinfo.value.boxed_type is Exception | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_closes_early_and_channel_exits(reg_addr): | ||||
| def test_trio_closes_early_and_channel_exits( | ||||
|     reg_addr: tuple[str, int], | ||||
| ): | ||||
|     ''' | ||||
|     Check that if the `trio`-task "exits early" on `async for`ing the | ||||
|     inter-task-channel (via a `break`) we exit silently from the | ||||
|     `open_channel_from()` block and get a final `Return[None]` msg. | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|                 stream_from_aio, | ||||
|                 exit_early=True, | ||||
|                 infect_asyncio=True, | ||||
|             ) | ||||
|             # should raise RAE diectly | ||||
|             await portal.result() | ||||
|         with trio.fail_after(2): | ||||
|             async with tractor.open_nursery( | ||||
|                 # debug_mode=True, | ||||
|                 # enable_stack_on_sig=True, | ||||
|             ) as n: | ||||
|                 portal = await n.run_in_actor( | ||||
|                     stream_from_aio, | ||||
|                     exit_early=True, | ||||
|                     infect_asyncio=True, | ||||
|                 ) | ||||
|                 # should raise RAE diectly | ||||
|                 print('waiting on final infected subactor result..') | ||||
|                 res: None = await portal.wait_for_result() | ||||
|                 assert res is None | ||||
|                 print('infected subactor returned result: {res!r}\n') | ||||
| 
 | ||||
|     # should be a quiet exit on a simple channel exit | ||||
|     trio.run(main) | ||||
|     trio.run( | ||||
|         main, | ||||
|         # strict_exception_groups=False, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def test_aio_errors_and_channel_propagates_and_closes(reg_addr): | ||||
|  | @ -660,6 +707,7 @@ def test_echoserver_detailed_mechanics( | |||
| ) | ||||
| def test_infected_root_actor( | ||||
|     raise_error_mid_stream: bool|Exception, | ||||
| 
 | ||||
|     # conftest wide | ||||
|     loglevel: str, | ||||
|     debug_mode: bool, | ||||
|  | @ -670,36 +718,38 @@ def test_infected_root_actor( | |||
| 
 | ||||
|     ''' | ||||
|     async def _trio_main(): | ||||
|         with trio.fail_after(2): | ||||
|             first: str | ||||
|             chan: to_asyncio.LinkedTaskChannel | ||||
|             async with ( | ||||
|                 tractor.open_root_actor( | ||||
|                     debug_mode=debug_mode, | ||||
|                     loglevel=loglevel, | ||||
|                 ), | ||||
|                 to_asyncio.open_channel_from( | ||||
|                     aio_echo_server, | ||||
|                 ) as (first, chan), | ||||
|             ): | ||||
|                 assert first == 'start' | ||||
| 
 | ||||
|         first: str | ||||
|         chan: to_asyncio.LinkedTaskChannel | ||||
|         async with ( | ||||
|             tractor.open_root_actor( | ||||
|                 debug_mode=debug_mode, | ||||
|                 loglevel=loglevel, | ||||
|             ), | ||||
|             to_asyncio.open_channel_from( | ||||
|                 aio_echo_server, | ||||
|             ) as (first, chan), | ||||
|         ): | ||||
|             assert first == 'start' | ||||
|                 for i in range(1000): | ||||
|                     await chan.send(i) | ||||
|                     out = await chan.receive() | ||||
|                     assert out == i | ||||
|                     print(f'asyncio echoing {i}') | ||||
| 
 | ||||
|             for i in range(1000): | ||||
|                 await chan.send(i) | ||||
|                 out = await chan.receive() | ||||
|                 assert out == i | ||||
|                 print(f'asyncio echoing {i}') | ||||
|                     if raise_error_mid_stream and i == 500: | ||||
|                         raise raise_error_mid_stream | ||||
| 
 | ||||
|                 if raise_error_mid_stream and i == 500: | ||||
|                     raise raise_error_mid_stream | ||||
| 
 | ||||
|                 if out is None: | ||||
|                     try: | ||||
|                         out = await chan.receive() | ||||
|                     except trio.EndOfChannel: | ||||
|                         break | ||||
|                     else: | ||||
|                         raise RuntimeError('aio channel never stopped?') | ||||
|                     if out is None: | ||||
|                         try: | ||||
|                             out = await chan.receive() | ||||
|                         except trio.EndOfChannel: | ||||
|                             break | ||||
|                         else: | ||||
|                             raise RuntimeError( | ||||
|                                 'aio channel never stopped?' | ||||
|                             ) | ||||
| 
 | ||||
|     if raise_error_mid_stream: | ||||
|         with pytest.raises(raise_error_mid_stream): | ||||
|  | @ -947,6 +997,158 @@ def test_sigint_closes_lifetime_stack( | |||
|     trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| async def sync_and_err( | ||||
|     # just signature placeholders for compat with | ||||
|     # ``to_asyncio.open_channel_from()`` | ||||
|     to_trio: trio.MemorySendChannel, | ||||
|     from_trio: asyncio.Queue, | ||||
|     ev: asyncio.Event, | ||||
| 
 | ||||
| ): | ||||
|     if to_trio: | ||||
|         to_trio.send_nowait('start') | ||||
| 
 | ||||
|     await ev.wait() | ||||
|     raise RuntimeError('asyncio-side') | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'aio_err_trigger', | ||||
|     [ | ||||
|         'before_start_point', | ||||
|         'after_trio_task_starts', | ||||
|         'after_start_point', | ||||
|     ], | ||||
|     ids='aio_err_triggered={}'.format | ||||
| ) | ||||
| def test_trio_prestarted_task_bubbles( | ||||
|     aio_err_trigger: str, | ||||
| 
 | ||||
|     # conftest wide | ||||
|     loglevel: str, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
| 
 | ||||
|     async def pre_started_err( | ||||
|         raise_err: bool = False, | ||||
|         pre_sleep: float|None = None, | ||||
|         aio_trigger: asyncio.Event|None = None, | ||||
|         task_status=trio.TASK_STATUS_IGNORED, | ||||
|     ): | ||||
|         ''' | ||||
|         Maybe pre-started error then sleep. | ||||
| 
 | ||||
|         ''' | ||||
|         if pre_sleep is not None: | ||||
|             print(f'Sleeping from trio for {pre_sleep!r}s !') | ||||
|             await trio.sleep(pre_sleep) | ||||
| 
 | ||||
|         # signal aio-task to raise JUST AFTER this task | ||||
|         # starts but has not yet `.started()` | ||||
|         if aio_trigger: | ||||
|             print('Signalling aio-task to raise from `trio`!!') | ||||
|             aio_trigger.set() | ||||
| 
 | ||||
|         if raise_err: | ||||
|             print('Raising from trio!') | ||||
|             raise TypeError('trio-side') | ||||
| 
 | ||||
|         task_status.started() | ||||
|         await trio.sleep_forever() | ||||
| 
 | ||||
|     async def _trio_main(): | ||||
|         # with trio.fail_after(2): | ||||
|         with trio.fail_after(999): | ||||
|             first: str | ||||
|             chan: to_asyncio.LinkedTaskChannel | ||||
|             aio_ev = asyncio.Event() | ||||
| 
 | ||||
|             async with ( | ||||
|                 tractor.open_root_actor( | ||||
|                     debug_mode=False, | ||||
|                     loglevel=loglevel, | ||||
|                 ), | ||||
| 
 | ||||
|                 # where we'll start a sub-task that errors BEFORE | ||||
|                 # calling `.started()` such that the error should | ||||
|                 # bubble before the guest run terminates! | ||||
|                 trio.open_nursery() as tn, | ||||
| 
 | ||||
|                 # THEN start an infect task which should error just | ||||
|                 # after the trio-side's task does. | ||||
|                 to_asyncio.open_channel_from( | ||||
|                     partial( | ||||
|                         sync_and_err, | ||||
|                         ev=aio_ev, | ||||
|                     ) | ||||
|                 ) as (first, chan), | ||||
|             ): | ||||
| 
 | ||||
|                 for i in range(5): | ||||
|                     pre_sleep: float|None = None | ||||
|                     raise_err: bool = False | ||||
|                     last_iter: bool = (i == 4) | ||||
| 
 | ||||
|                     if last_iter: | ||||
|                         raise_err: bool = True | ||||
| 
 | ||||
|                         # trigger aio task to error on next loop | ||||
|                         # tick/checkpoint | ||||
|                         if aio_err_trigger == 'before_start_point': | ||||
|                             aio_ev.set() | ||||
| 
 | ||||
|                         pre_sleep: float = 0 | ||||
| 
 | ||||
|                     await tn.start( | ||||
|                         pre_started_err, | ||||
|                         raise_err, | ||||
|                         pre_sleep, | ||||
|                         (aio_ev if ( | ||||
|                                 aio_err_trigger == 'after_trio_task_starts' | ||||
|                                 and | ||||
|                                 last_iter | ||||
|                             ) else None | ||||
|                         ), | ||||
|                     ) | ||||
| 
 | ||||
|                     if ( | ||||
|                         aio_err_trigger == 'after_start_point' | ||||
|                         and | ||||
|                         last_iter | ||||
|                     ): | ||||
|                         aio_ev.set() | ||||
| 
 | ||||
|     with pytest.raises( | ||||
|         expected_exception=ExceptionGroup, | ||||
|     ) as excinfo: | ||||
|         tractor.to_asyncio.run_as_asyncio_guest( | ||||
|             trio_main=_trio_main, | ||||
|         ) | ||||
| 
 | ||||
|     eg = excinfo.value | ||||
|     rte_eg, rest_eg = eg.split(RuntimeError) | ||||
| 
 | ||||
|     # ensure the trio-task's error bubbled despite the aio-side | ||||
|     # having (maybe) errored first. | ||||
|     if aio_err_trigger in ( | ||||
|         'after_trio_task_starts', | ||||
|         'after_start_point', | ||||
|     ): | ||||
|         assert len(errs := rest_eg.exceptions) == 1 | ||||
|         typerr = errs[0] | ||||
|         assert ( | ||||
|             type(typerr) is TypeError | ||||
|             and | ||||
|             'trio-side' in typerr.args | ||||
|         ) | ||||
| 
 | ||||
|     # when aio errors BEFORE (last) trio task is scheduled, we should | ||||
|     # never see anythinb but the aio-side. | ||||
|     else: | ||||
|         assert len(rtes := rte_eg.exceptions) == 1 | ||||
|         assert 'asyncio-side' in rtes[0].args[0] | ||||
| 
 | ||||
| 
 | ||||
| # TODO: debug_mode tests once we get support for `asyncio`! | ||||
| # | ||||
| # -[ ] need tests to wrap both scripts: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue