Add per-side graceful-exit/cancel excs-as-signals
Such that any combination of task terminations/exits can be explicitly
handled and "dual side independent" crash cases re-raised in egs.
The main error-or-exit impl changes include,
- use of new per-side "signaling exceptions":
  - TrioTaskExited|TrioCancelled for signalling aio.
  - AsyncioTaskExited|AsyncioCancelled for signalling trio.
- NOT overloading the `LinkedTaskChannel._trio/aio_err` fields for
  err-as-signal relay and instead add a new pair of
  `._trio/aio_to_raise` maybe-exc-attrs which allow each side's
  task to specify what it would want the other side to raise to signal
  its/a termination outcome:
  - `._trio_to_raise: AsyncioTaskExited|AsyncioCancelled` to signal,
    |_ the aio task having returned while the trio side was still reading
       from the `asyncio.Queue` or is just not `.done()`.
    |_ the aio task being self or trio-request cancelled where
       a `asyncio.CancelledError` is raised and caught but NOT relayed
       as is back to trio; instead signal a "more explicit" exc type.
  - `._aio_to_raise: TrioTaskExited|TrioCancelled` to signal,
    |_ the trio task having returned while the aio side was still reading
       from the mem chan and indicating that the trio side might not
       care any more about future streamed values (like the
       `Stop/EndOfChannel` equivs for ipc `Context`s).
    |_ when the trio task canceld we do
        a `asyncio.Future.set_exception(TrioTaskExited())` to indicate
        to the aio side verbosely that it should cancel due to the trio
        parent.
  - `_aio/trio_err` are now left to only capturing the **actual**
    per-side task excs for introspection / other side's handling logic.
- supporting "graceful exits" depending on API in use from
  `translate_aio_errors()` such that if either side exits but the other
  side isn't expect to consume the final `return`ed value, we just exit
  silently, which required:
  - adding a `suppress_graceful_exits: bool` flag.
  - adjusting the `maybe_raise_aio_side_err()` logic to use that flag
    and suppress only on certain combos of `._trio_to_raise/._trio_err`.
  - prefer to raise `._trio_to_raise` when the aio-side is the src and
    vice versa.
- filling out pedantic logging for cancellation cases indicating which
  side is the cause.
- add a `LinkedTaskChannel._aio_result` modelled after our
  `Context._result` a a similar `.wait_for_result()` interface which
  allows maybe accessing the aio task's final return value if desired
  when using the `open_channel_from()` API.
- rename `cancel_trio()` done handler -> `signal_trio_when_done()`
Also some fairly major test suite updates,
- add a `delay: int` producing fixture which delivers a much larger
  timeout whenever `debug_mode` is set so that the REPL can be used
  without a surrounding cancel firing.
- add a new `test_aio_exits_early_relays_AsyncioTaskExited` including
  a paired `exit_early: bool` flag to `push_from_aio_task()`.
- adjust `test_trio_closes_early_causes_aio_checkpoint_raise` to expect
  a `to_asyncio.TrioTaskExited`.
			
			
				ext_type_plds_XPS_BACKUP
			
			
		
							parent
							
								
									0607a31ddd
								
							
						
					
					
						commit
						d6a0c515ec
					
				|  | @ -32,6 +32,17 @@ from tractor.trionics import BroadcastReceiver | ||||||
| from tractor._testing import expect_ctxc | from tractor._testing import expect_ctxc | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @pytest.fixture( | ||||||
|  |     scope='module', | ||||||
|  |     # autouse=True, | ||||||
|  | ) | ||||||
|  | def delay(debug_mode: bool) -> int: | ||||||
|  |     if debug_mode: | ||||||
|  |         return 999 | ||||||
|  |     else: | ||||||
|  |         return 1 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| async def sleep_and_err( | async def sleep_and_err( | ||||||
|     sleep_for: float = 0.1, |     sleep_for: float = 0.1, | ||||||
| 
 | 
 | ||||||
|  | @ -59,20 +70,24 @@ async def trio_cancels_single_aio_task(): | ||||||
|         await tractor.to_asyncio.run_task(aio_sleep_forever) |         await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_trio_cancels_aio_on_actor_side(reg_addr): | def test_trio_cancels_aio_on_actor_side( | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  |     delay: int, | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Spawn an infected actor that is cancelled by the ``trio`` side |     Spawn an infected actor that is cancelled by the ``trio`` side | ||||||
|     task using std cancel scope apis. |     task using std cancel scope apis. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|         async with tractor.open_nursery( |         with trio.fail_after(1 + delay): | ||||||
|             registry_addrs=[reg_addr] |             async with tractor.open_nursery( | ||||||
|         ) as n: |                 registry_addrs=[reg_addr] | ||||||
|             await n.run_in_actor( |             ) as n: | ||||||
|                 trio_cancels_single_aio_task, |                 await n.run_in_actor( | ||||||
|                 infect_asyncio=True, |                     trio_cancels_single_aio_task, | ||||||
|             ) |                     infect_asyncio=True, | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|     trio.run(main) |     trio.run(main) | ||||||
| 
 | 
 | ||||||
|  | @ -116,7 +131,9 @@ async def asyncio_actor( | ||||||
|         raise |         raise | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_aio_simple_error(reg_addr): | def test_aio_simple_error( | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Verify a simple remote asyncio error propagates back through trio |     Verify a simple remote asyncio error propagates back through trio | ||||||
|     to the parent actor. |     to the parent actor. | ||||||
|  | @ -153,7 +170,9 @@ def test_aio_simple_error(reg_addr): | ||||||
|     assert err.boxed_type is AssertionError |     assert err.boxed_type is AssertionError | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_tractor_cancels_aio(reg_addr): | def test_tractor_cancels_aio( | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Verify we can cancel a spawned asyncio task gracefully. |     Verify we can cancel a spawned asyncio task gracefully. | ||||||
| 
 | 
 | ||||||
|  | @ -172,7 +191,9 @@ def test_tractor_cancels_aio(reg_addr): | ||||||
|     trio.run(main) |     trio.run(main) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_trio_cancels_aio(reg_addr): | def test_trio_cancels_aio( | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  | ): | ||||||
|     ''' |     ''' | ||||||
|     Much like the above test with ``tractor.Portal.cancel_actor()`` |     Much like the above test with ``tractor.Portal.cancel_actor()`` | ||||||
|     except we just use a standard ``trio`` cancellation api. |     except we just use a standard ``trio`` cancellation api. | ||||||
|  | @ -203,7 +224,8 @@ async def trio_ctx( | ||||||
| 
 | 
 | ||||||
|     # this will block until the ``asyncio`` task sends a "first" |     # this will block until the ``asyncio`` task sends a "first" | ||||||
|     # message. |     # message. | ||||||
|     with trio.fail_after(2): |     delay: int = 999 if tractor.debug_mode() else 1 | ||||||
|  |     with trio.fail_after(1 + delay): | ||||||
|         try: |         try: | ||||||
|             async with ( |             async with ( | ||||||
|                 trio.open_nursery( |                 trio.open_nursery( | ||||||
|  | @ -239,7 +261,8 @@ async def trio_ctx( | ||||||
|     ids='parent_actor_cancels_child={}'.format |     ids='parent_actor_cancels_child={}'.format | ||||||
| ) | ) | ||||||
| def test_context_spawns_aio_task_that_errors( | def test_context_spawns_aio_task_that_errors( | ||||||
|     reg_addr, |     reg_addr: tuple[str, int], | ||||||
|  |     delay: int, | ||||||
|     parent_cancels: bool, |     parent_cancels: bool, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|  | @ -249,7 +272,7 @@ def test_context_spawns_aio_task_that_errors( | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|         with trio.fail_after(2): |         with trio.fail_after(1 + delay): | ||||||
|             async with tractor.open_nursery() as n: |             async with tractor.open_nursery() as n: | ||||||
|                 p = await n.start_actor( |                 p = await n.start_actor( | ||||||
|                     'aio_daemon', |                     'aio_daemon', | ||||||
|  | @ -322,11 +345,12 @@ async def aio_cancel(): | ||||||
| 
 | 
 | ||||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled( | def test_aio_cancelled_from_aio_causes_trio_cancelled( | ||||||
|     reg_addr: tuple, |     reg_addr: tuple, | ||||||
|  |     delay: int, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     When the `asyncio.Task` cancels itself the `trio` side cshould |     When the `asyncio.Task` cancels itself the `trio` side should | ||||||
|     also cancel and teardown and relay the cancellation cross-process |     also cancel and teardown and relay the cancellation cross-process | ||||||
|     to the caller (parent). |     to the parent caller. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|  | @ -342,7 +366,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( | ||||||
|             # NOTE: normally the `an.__aexit__()` waits on the |             # NOTE: normally the `an.__aexit__()` waits on the | ||||||
|             # portal's result but we do it explicitly here |             # portal's result but we do it explicitly here | ||||||
|             # to avoid indent levels. |             # to avoid indent levels. | ||||||
|             with trio.fail_after(1): |             with trio.fail_after(1 + delay): | ||||||
|                 await p.wait_for_result() |                 await p.wait_for_result() | ||||||
| 
 | 
 | ||||||
|     with pytest.raises( |     with pytest.raises( | ||||||
|  | @ -353,11 +377,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled( | ||||||
|     # might get multiple `trio.Cancelled`s as well inside an inception |     # might get multiple `trio.Cancelled`s as well inside an inception | ||||||
|     err: RemoteActorError|ExceptionGroup = excinfo.value |     err: RemoteActorError|ExceptionGroup = excinfo.value | ||||||
|     if isinstance(err, ExceptionGroup): |     if isinstance(err, ExceptionGroup): | ||||||
|         err = next(itertools.dropwhile( |         excs = err.exceptions | ||||||
|             lambda exc: not isinstance(exc, tractor.RemoteActorError), |         assert len(excs) == 1 | ||||||
|             err.exceptions |         final_exc = excs[0] | ||||||
|         )) |         assert isinstance(final_exc, tractor.RemoteActorError) | ||||||
|         assert err |  | ||||||
| 
 | 
 | ||||||
|     # relayed boxed error should be our `trio`-task's |     # relayed boxed error should be our `trio`-task's | ||||||
|     # cancel-signal-proxy-equivalent of `asyncio.CancelledError`. |     # cancel-signal-proxy-equivalent of `asyncio.CancelledError`. | ||||||
|  | @ -370,15 +393,18 @@ async def no_to_trio_in_args(): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def push_from_aio_task( | async def push_from_aio_task( | ||||||
| 
 |  | ||||||
|     sequence: Iterable, |     sequence: Iterable, | ||||||
|     to_trio: trio.abc.SendChannel, |     to_trio: trio.abc.SendChannel, | ||||||
|     expect_cancel: False, |     expect_cancel: False, | ||||||
|     fail_early: bool, |     fail_early: bool, | ||||||
|  |     exit_early: bool, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|  |         # print('trying breakpoint') | ||||||
|  |         # breakpoint() | ||||||
|  | 
 | ||||||
|         # sync caller ctx manager |         # sync caller ctx manager | ||||||
|         to_trio.send_nowait(True) |         to_trio.send_nowait(True) | ||||||
| 
 | 
 | ||||||
|  | @ -387,10 +413,27 @@ async def push_from_aio_task( | ||||||
|             to_trio.send_nowait(i) |             to_trio.send_nowait(i) | ||||||
|             await asyncio.sleep(0.001) |             await asyncio.sleep(0.001) | ||||||
| 
 | 
 | ||||||
|             if i == 50 and fail_early: |             if ( | ||||||
|                 raise Exception |                 i == 50 | ||||||
|  |             ): | ||||||
|  |                 if fail_early: | ||||||
|  |                     print('Raising exc from aio side!') | ||||||
|  |                     raise Exception | ||||||
| 
 | 
 | ||||||
|         print('asyncio streamer complete!') |                 if exit_early: | ||||||
|  |                     # TODO? really you could enforce the same | ||||||
|  |                     # SC-proto we use for actors here with asyncio | ||||||
|  |                     # such that a Return[None] msg would be | ||||||
|  |                     # implicitly delivered to the trio side? | ||||||
|  |                     # | ||||||
|  |                     # XXX => this might be the end-all soln for | ||||||
|  |                     # converting any-inter-task system (regardless | ||||||
|  |                     # of maybe-remote runtime or language) to be | ||||||
|  |                     # SC-compat no? | ||||||
|  |                     print(f'asyncio breaking early @ {i!r}') | ||||||
|  |                     break | ||||||
|  | 
 | ||||||
|  |         print('asyncio streaming complete!') | ||||||
| 
 | 
 | ||||||
|     except asyncio.CancelledError: |     except asyncio.CancelledError: | ||||||
|         if not expect_cancel: |         if not expect_cancel: | ||||||
|  | @ -402,9 +445,10 @@ async def push_from_aio_task( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def stream_from_aio( | async def stream_from_aio( | ||||||
|     exit_early: bool = False, |     trio_exit_early: bool = False, | ||||||
|     raise_err: bool = False, |     trio_raise_err: bool = False, | ||||||
|     aio_raise_err: bool = False, |     aio_raise_err: bool = False, | ||||||
|  |     aio_exit_early: bool = False, | ||||||
|     fan_out: bool = False, |     fan_out: bool = False, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|  | @ -417,8 +461,17 @@ async def stream_from_aio( | ||||||
|         async with to_asyncio.open_channel_from( |         async with to_asyncio.open_channel_from( | ||||||
|             push_from_aio_task, |             push_from_aio_task, | ||||||
|             sequence=seq, |             sequence=seq, | ||||||
|             expect_cancel=raise_err or exit_early, |             expect_cancel=trio_raise_err or trio_exit_early, | ||||||
|             fail_early=aio_raise_err, |             fail_early=aio_raise_err, | ||||||
|  |             exit_early=aio_exit_early, | ||||||
|  | 
 | ||||||
|  |             # such that we can test exit early cases | ||||||
|  |             # for each side explicitly. | ||||||
|  |             suppress_graceful_exits=(not( | ||||||
|  |                 aio_exit_early | ||||||
|  |                 or | ||||||
|  |                 trio_exit_early | ||||||
|  |             )) | ||||||
| 
 | 
 | ||||||
|         ) as (first, chan): |         ) as (first, chan): | ||||||
| 
 | 
 | ||||||
|  | @ -435,9 +488,9 @@ async def stream_from_aio( | ||||||
|                     pulled.append(value) |                     pulled.append(value) | ||||||
| 
 | 
 | ||||||
|                     if value == 50: |                     if value == 50: | ||||||
|                         if raise_err: |                         if trio_raise_err: | ||||||
|                             raise Exception |                             raise Exception | ||||||
|                         elif exit_early: |                         elif trio_exit_early: | ||||||
|                             print('`consume()` breaking early!\n') |                             print('`consume()` breaking early!\n') | ||||||
|                             break |                             break | ||||||
| 
 | 
 | ||||||
|  | @ -471,10 +524,14 @@ async def stream_from_aio( | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
| 
 | 
 | ||||||
|         if ( |         if not ( | ||||||
|             not raise_err and |             trio_raise_err | ||||||
|             not exit_early and |             or | ||||||
|             not aio_raise_err |             trio_exit_early | ||||||
|  |             or | ||||||
|  |             aio_raise_err | ||||||
|  |             or | ||||||
|  |             aio_exit_early | ||||||
|         ): |         ): | ||||||
|             if fan_out: |             if fan_out: | ||||||
|                 # we get double the pulled values in the |                 # we get double the pulled values in the | ||||||
|  | @ -484,6 +541,7 @@ async def stream_from_aio( | ||||||
|                 assert list(sorted(pulled)) == expect |                 assert list(sorted(pulled)) == expect | ||||||
| 
 | 
 | ||||||
|             else: |             else: | ||||||
|  |                 # await tractor.pause() | ||||||
|                 assert pulled == expect |                 assert pulled == expect | ||||||
|         else: |         else: | ||||||
|             assert not fan_out |             assert not fan_out | ||||||
|  | @ -497,7 +555,10 @@ async def stream_from_aio( | ||||||
|     'fan_out', [False, True], |     'fan_out', [False, True], | ||||||
|     ids='fan_out_w_chan_subscribe={}'.format |     ids='fan_out_w_chan_subscribe={}'.format | ||||||
| ) | ) | ||||||
| def test_basic_interloop_channel_stream(reg_addr, fan_out): | def test_basic_interloop_channel_stream( | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  |     fan_out: bool, | ||||||
|  | ): | ||||||
|     async def main(): |     async def main(): | ||||||
|         async with tractor.open_nursery() as n: |         async with tractor.open_nursery() as n: | ||||||
|             portal = await n.run_in_actor( |             portal = await n.run_in_actor( | ||||||
|  | @ -517,7 +578,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr): | ||||||
|         async with tractor.open_nursery() as n: |         async with tractor.open_nursery() as n: | ||||||
|             portal = await n.run_in_actor( |             portal = await n.run_in_actor( | ||||||
|                 stream_from_aio, |                 stream_from_aio, | ||||||
|                 raise_err=True, |                 trio_raise_err=True, | ||||||
|                 infect_asyncio=True, |                 infect_asyncio=True, | ||||||
|             ) |             ) | ||||||
|             # should trigger remote actor error |             # should trigger remote actor error | ||||||
|  | @ -530,42 +591,114 @@ def test_trio_error_cancels_intertask_chan(reg_addr): | ||||||
|     excinfo.value.boxed_type is Exception |     excinfo.value.boxed_type is Exception | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_trio_closes_early_and_channel_exits( | def test_trio_closes_early_causes_aio_checkpoint_raise( | ||||||
|     reg_addr: tuple[str, int], |     reg_addr: tuple[str, int], | ||||||
|  |     delay: int, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     Check that if the `trio`-task "exits early" on `async for`ing the |     Check that if the `trio`-task "exits early and silently" (in this | ||||||
|     inter-task-channel (via a `break`) we exit silently from the |     case during `async for`-ing the inter-task-channel via | ||||||
|     `open_channel_from()` block and get a final `Return[None]` msg. |     a `break`-from-loop), we raise `TrioTaskExited` on the | ||||||
|  |     `asyncio`-side which also then bubbles up through the | ||||||
|  |     `open_channel_from()` block indicating that the `asyncio.Task` | ||||||
|  |     hit a ran another checkpoint despite the `trio.Task` exit. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|         with trio.fail_after(2): |         with trio.fail_after(1 + delay): | ||||||
|             async with tractor.open_nursery( |             async with tractor.open_nursery( | ||||||
|                 # debug_mode=True, |                 # debug_mode=True, | ||||||
|                 # enable_stack_on_sig=True, |                 # enable_stack_on_sig=True, | ||||||
|             ) as n: |             ) as n: | ||||||
|                 portal = await n.run_in_actor( |                 portal = await n.run_in_actor( | ||||||
|                     stream_from_aio, |                     stream_from_aio, | ||||||
|                     exit_early=True, |                     trio_exit_early=True, | ||||||
|                     infect_asyncio=True, |                     infect_asyncio=True, | ||||||
|                 ) |                 ) | ||||||
|                 # should raise RAE diectly |                 # should raise RAE diectly | ||||||
|                 print('waiting on final infected subactor result..') |                 print('waiting on final infected subactor result..') | ||||||
|                 res: None = await portal.wait_for_result() |                 res: None = await portal.wait_for_result() | ||||||
|                 assert res is None |                 assert res is None | ||||||
|                 print('infected subactor returned result: {res!r}\n') |                 print(f'infected subactor returned result: {res!r}\n') | ||||||
| 
 | 
 | ||||||
|     # should be a quiet exit on a simple channel exit |     # should be a quiet exit on a simple channel exit | ||||||
|     trio.run( |     with pytest.raises(RemoteActorError) as excinfo: | ||||||
|         main, |         trio.run(main) | ||||||
|         # strict_exception_groups=False, | 
 | ||||||
|     ) |     # ensure remote error is an explicit `AsyncioCancelled` sub-type | ||||||
|  |     # which indicates to the aio task that the trio side exited | ||||||
|  |     # silently WITHOUT raising a `trio.Cancelled` (which would | ||||||
|  |     # normally be raised instead as a `AsyncioCancelled`). | ||||||
|  |     excinfo.value.boxed_type is to_asyncio.TrioTaskExited | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_aio_errors_and_channel_propagates_and_closes(reg_addr): | def test_aio_exits_early_relays_AsyncioTaskExited( | ||||||
|  |     # TODO, parametrize the 3 possible trio side conditions: | ||||||
|  |     # - trio blocking on receive, aio exits early | ||||||
|  |     # - trio cancelled AND aio exits early on its next tick | ||||||
|  |     # - trio errors AND aio exits early on its next tick | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  |     debug_mode: bool, | ||||||
|  |     delay: int, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Check that if the `asyncio`-task "exits early and silently" (in this | ||||||
|  |     case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel` | ||||||
|  |     it `break`s from the loop), we raise `AsyncioTaskExited` on the | ||||||
|  |     `trio`-side which then DOES NOT BUBBLE up through the | ||||||
|  |     `open_channel_from()` block UNLESS, | ||||||
|  | 
 | ||||||
|  |     - the trio.Task also errored/cancelled, in which case we wrap | ||||||
|  |       both errors in an eg | ||||||
|  |     - the trio.Task was blocking on rxing a value from the | ||||||
|  |       `InterLoopTaskChannel`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|         async with tractor.open_nursery() as n: |         with trio.fail_after(1 + delay): | ||||||
|  |             async with tractor.open_nursery( | ||||||
|  |                 debug_mode=debug_mode, | ||||||
|  |                 # enable_stack_on_sig=True, | ||||||
|  |             ) as an: | ||||||
|  |                 portal = await an.run_in_actor( | ||||||
|  |                     stream_from_aio, | ||||||
|  |                     infect_asyncio=True, | ||||||
|  |                     trio_exit_early=False, | ||||||
|  |                     aio_exit_early=True, | ||||||
|  |                 ) | ||||||
|  |                 # should raise RAE diectly | ||||||
|  |                 print('waiting on final infected subactor result..') | ||||||
|  |                 res: None = await portal.wait_for_result() | ||||||
|  |                 assert res is None | ||||||
|  |                 print(f'infected subactor returned result: {res!r}\n') | ||||||
|  | 
 | ||||||
|  |     # should be a quiet exit on a simple channel exit | ||||||
|  |     with pytest.raises(RemoteActorError) as excinfo: | ||||||
|  |         trio.run(main) | ||||||
|  | 
 | ||||||
|  |     exc = excinfo.value | ||||||
|  | 
 | ||||||
|  |     # TODO, wow bug! | ||||||
|  |     # -[ ] bp handler not replaced!?!? | ||||||
|  |     # breakpoint() | ||||||
|  | 
 | ||||||
|  |     # import pdbp; pdbp.set_trace() | ||||||
|  | 
 | ||||||
|  |     # ensure remote error is an explicit `AsyncioCancelled` sub-type | ||||||
|  |     # which indicates to the aio task that the trio side exited | ||||||
|  |     # silently WITHOUT raising a `trio.Cancelled` (which would | ||||||
|  |     # normally be raised instead as a `AsyncioCancelled`). | ||||||
|  |     assert exc.boxed_type is to_asyncio.AsyncioTaskExited | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_aio_errors_and_channel_propagates_and_closes( | ||||||
|  |     reg_addr: tuple[str, int], | ||||||
|  |     debug_mode: bool, | ||||||
|  | ): | ||||||
|  |     async def main(): | ||||||
|  |         async with tractor.open_nursery( | ||||||
|  |             debug_mode=debug_mode, | ||||||
|  |         ) as n: | ||||||
|             portal = await n.run_in_actor( |             portal = await n.run_in_actor( | ||||||
|                 stream_from_aio, |                 stream_from_aio, | ||||||
|                 aio_raise_err=True, |                 aio_raise_err=True, | ||||||
|  | @ -852,6 +985,8 @@ def test_sigint_closes_lifetime_stack( | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     async def main(): |     async def main(): | ||||||
|  | 
 | ||||||
|  |         delay = 999 if tractor.debug_mode() else 1 | ||||||
|         try: |         try: | ||||||
|             an: tractor.ActorNursery |             an: tractor.ActorNursery | ||||||
|             async with tractor.open_nursery( |             async with tractor.open_nursery( | ||||||
|  | @ -902,7 +1037,7 @@ def test_sigint_closes_lifetime_stack( | ||||||
|                     if wait_for_ctx: |                     if wait_for_ctx: | ||||||
|                         print('waiting for ctx outcome in parent..') |                         print('waiting for ctx outcome in parent..') | ||||||
|                         try: |                         try: | ||||||
|                             with trio.fail_after(1): |                             with trio.fail_after(1 + delay): | ||||||
|                                 await ctx.wait_for_result() |                                 await ctx.wait_for_result() | ||||||
|                         except tractor.ContextCancelled as ctxc: |                         except tractor.ContextCancelled as ctxc: | ||||||
|                             assert ctxc.canceller == ctx.chan.uid |                             assert ctxc.canceller == ctx.chan.uid | ||||||
|  |  | ||||||
|  | @ -82,6 +82,39 @@ class InternalError(RuntimeError): | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
| 
 | 
 | ||||||
|  | class AsyncioCancelled(Exception): | ||||||
|  |     ''' | ||||||
|  |     Asyncio cancelled translation (non-base) error | ||||||
|  |     for use with the ``to_asyncio`` module | ||||||
|  |     to be raised in the ``trio`` side task | ||||||
|  | 
 | ||||||
|  |     NOTE: this should NOT inherit from `asyncio.CancelledError` or | ||||||
|  |     tests should break! | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class AsyncioTaskExited(Exception): | ||||||
|  |     ''' | ||||||
|  |     asyncio.Task "exited" translation error for use with the | ||||||
|  |     `to_asyncio` APIs to be raised in the `trio` side task indicating | ||||||
|  |     on `.run_task()`/`.open_channel_from()` exit that the aio side | ||||||
|  |     exited early/silently. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  | class TrioTaskExited(AsyncioCancelled): | ||||||
|  |     ''' | ||||||
|  |     The `trio`-side task exited without explicitly cancelling the | ||||||
|  |     `asyncio.Task` peer. | ||||||
|  | 
 | ||||||
|  |     This is very similar to how `trio.ClosedResource` acts as | ||||||
|  |     a "clean shutdown" signal to the consumer side of a mem-chan, | ||||||
|  | 
 | ||||||
|  |     https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| # NOTE: more or less should be close to these: | # NOTE: more or less should be close to these: | ||||||
| # 'boxed_type', | # 'boxed_type', | ||||||
|  | @ -127,8 +160,8 @@ _body_fields: list[str] = list( | ||||||
| 
 | 
 | ||||||
| def get_err_type(type_name: str) -> BaseException|None: | def get_err_type(type_name: str) -> BaseException|None: | ||||||
|     ''' |     ''' | ||||||
|     Look up an exception type by name from the set of locally |     Look up an exception type by name from the set of locally known | ||||||
|     known namespaces: |     namespaces: | ||||||
| 
 | 
 | ||||||
|     - `builtins` |     - `builtins` | ||||||
|     - `tractor._exceptions` |     - `tractor._exceptions` | ||||||
|  | @ -358,6 +391,13 @@ class RemoteActorError(Exception): | ||||||
|                 self._ipc_msg.src_type_str |                 self._ipc_msg.src_type_str | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  |             if not self._src_type: | ||||||
|  |                 raise TypeError( | ||||||
|  |                     f'Failed to lookup src error type with ' | ||||||
|  |                     f'`tractor._exceptions.get_err_type()` :\n' | ||||||
|  |                     f'{self.src_type_str}' | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|         return self._src_type |         return self._src_type | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|  | @ -652,16 +692,10 @@ class RemoteActorError(Exception): | ||||||
|         failing actor's remote env. |         failing actor's remote env. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         src_type_ref: Type[BaseException] = self.src_type |  | ||||||
|         if not src_type_ref: |  | ||||||
|             raise TypeError( |  | ||||||
|                 'Failed to lookup src error type:\n' |  | ||||||
|                 f'{self.src_type_str}' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         # TODO: better tb insertion and all the fancier dunder |         # TODO: better tb insertion and all the fancier dunder | ||||||
|         # metadata stuff as per `.__context__` etc. and friends: |         # metadata stuff as per `.__context__` etc. and friends: | ||||||
|         # https://github.com/python-trio/trio/issues/611 |         # https://github.com/python-trio/trio/issues/611 | ||||||
|  |         src_type_ref: Type[BaseException] = self.src_type | ||||||
|         return src_type_ref(self.tb_str) |         return src_type_ref(self.tb_str) | ||||||
| 
 | 
 | ||||||
|     # TODO: local recontruction of nested inception for a given |     # TODO: local recontruction of nested inception for a given | ||||||
|  |  | ||||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
		Loading…
	
		Reference in New Issue