Compare commits
	
		
			1 Commits 
		
	
	
		
			284fa0340e
			...
			a870df68c0
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | a870df68c0 | 
|  | @ -2,16 +2,25 @@ | ||||||
| The hipster way to force SC onto the stdlib's "async": 'infection mode'. | The hipster way to force SC onto the stdlib's "async": 'infection mode'. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from typing import Optional, Iterable, Union |  | ||||||
| import asyncio | import asyncio | ||||||
| import builtins | import builtins | ||||||
|  | from contextlib import ExitStack | ||||||
| import itertools | import itertools | ||||||
| import importlib | import importlib | ||||||
|  | import os | ||||||
|  | from pathlib import Path | ||||||
|  | import signal | ||||||
|  | from typing import ( | ||||||
|  |     Callable, | ||||||
|  |     Iterable, | ||||||
|  |     Union, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| import trio | import trio | ||||||
| import tractor | import tractor | ||||||
| from tractor import ( | from tractor import ( | ||||||
|  |     current_actor, | ||||||
|     to_asyncio, |     to_asyncio, | ||||||
|     RemoteActorError, |     RemoteActorError, | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
|  | @ -25,8 +34,8 @@ async def sleep_and_err( | ||||||
| 
 | 
 | ||||||
|     # just signature placeholders for compat with |     # just signature placeholders for compat with | ||||||
|     # ``to_asyncio.open_channel_from()`` |     # ``to_asyncio.open_channel_from()`` | ||||||
|     to_trio: Optional[trio.MemorySendChannel] = None, |     to_trio: trio.MemorySendChannel|None = None, | ||||||
|     from_trio: Optional[asyncio.Queue] = None, |     from_trio: asyncio.Queue|None = None, | ||||||
| 
 | 
 | ||||||
| ): | ): | ||||||
|     if to_trio: |     if to_trio: | ||||||
|  | @ -36,7 +45,7 @@ async def sleep_and_err( | ||||||
|     assert 0 |     assert 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def sleep_forever(): | async def aio_sleep_forever(): | ||||||
|     await asyncio.sleep(float('inf')) |     await asyncio.sleep(float('inf')) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -44,7 +53,7 @@ async def trio_cancels_single_aio_task(): | ||||||
| 
 | 
 | ||||||
|     # spawn an ``asyncio`` task to run a func and return result |     # spawn an ``asyncio`` task to run a func and return result | ||||||
|     with trio.move_on_after(.2): |     with trio.move_on_after(.2): | ||||||
|         await tractor.to_asyncio.run_task(sleep_forever) |         await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_trio_cancels_aio_on_actor_side(reg_addr): | def test_trio_cancels_aio_on_actor_side(reg_addr): | ||||||
|  | @ -66,14 +75,13 @@ def test_trio_cancels_aio_on_actor_side(reg_addr): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def asyncio_actor( | async def asyncio_actor( | ||||||
| 
 |  | ||||||
|     target: str, |     target: str, | ||||||
|     expect_err: Exception|None = None |     expect_err: Exception|None = None | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|     assert tractor.current_actor().is_infected_aio() |     assert tractor.current_actor().is_infected_aio() | ||||||
|     target = globals()[target] |     target: Callable = globals()[target] | ||||||
| 
 | 
 | ||||||
|     if '.' in expect_err: |     if '.' in expect_err: | ||||||
|         modpath, _, name = expect_err.rpartition('.') |         modpath, _, name = expect_err.rpartition('.') | ||||||
|  | @ -140,7 +148,7 @@ def test_tractor_cancels_aio(reg_addr): | ||||||
|         async with tractor.open_nursery() as n: |         async with tractor.open_nursery() as n: | ||||||
|             portal = await n.run_in_actor( |             portal = await n.run_in_actor( | ||||||
|                 asyncio_actor, |                 asyncio_actor, | ||||||
|                 target='sleep_forever', |                 target='aio_sleep_forever', | ||||||
|                 expect_err='trio.Cancelled', |                 expect_err='trio.Cancelled', | ||||||
|                 infect_asyncio=True, |                 infect_asyncio=True, | ||||||
|             ) |             ) | ||||||
|  | @ -164,7 +172,7 @@ def test_trio_cancels_aio(reg_addr): | ||||||
|             async with tractor.open_nursery() as n: |             async with tractor.open_nursery() as n: | ||||||
|                 await n.run_in_actor( |                 await n.run_in_actor( | ||||||
|                     asyncio_actor, |                     asyncio_actor, | ||||||
|                     target='sleep_forever', |                     target='aio_sleep_forever', | ||||||
|                     expect_err='trio.Cancelled', |                     expect_err='trio.Cancelled', | ||||||
|                     infect_asyncio=True, |                     infect_asyncio=True, | ||||||
|                 ) |                 ) | ||||||
|  | @ -195,7 +203,7 @@ async def trio_ctx( | ||||||
|             # spawn another asyncio task for the cuck of it. |             # spawn another asyncio task for the cuck of it. | ||||||
|             n.start_soon( |             n.start_soon( | ||||||
|                 tractor.to_asyncio.run_task, |                 tractor.to_asyncio.run_task, | ||||||
|                 sleep_forever, |                 aio_sleep_forever, | ||||||
|             ) |             ) | ||||||
|             await trio.sleep_forever() |             await trio.sleep_forever() | ||||||
| 
 | 
 | ||||||
|  | @ -285,7 +293,7 @@ async def aio_cancel(): | ||||||
| 
 | 
 | ||||||
|     # cancel and enter sleep |     # cancel and enter sleep | ||||||
|     task.cancel() |     task.cancel() | ||||||
|     await sleep_forever() |     await aio_sleep_forever() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): | ||||||
|  | @ -355,7 +363,6 @@ async def push_from_aio_task( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def stream_from_aio( | async def stream_from_aio( | ||||||
| 
 |  | ||||||
|     exit_early: bool = False, |     exit_early: bool = False, | ||||||
|     raise_err: bool = False, |     raise_err: bool = False, | ||||||
|     aio_raise_err: bool = False, |     aio_raise_err: bool = False, | ||||||
|  | @ -618,6 +625,200 @@ def test_echoserver_detailed_mechanics( | ||||||
|         trio.run(main) |         trio.run(main) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def manage_file( | ||||||
|  |     ctx: tractor.Context, | ||||||
|  |     tmp_path_str: str, | ||||||
|  |     bg_aio_task: bool = False, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Start an `asyncio` task that just sleeps after registering a context | ||||||
|  |     with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree | ||||||
|  |     and ensure the stack is closed in the infected mode child. | ||||||
|  | 
 | ||||||
|  |     To verify the teardown state just write a tmpfile to the `testdir` | ||||||
|  |     and delete it on actor close. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  |     tmp_path: Path = Path(tmp_path_str) | ||||||
|  |     tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' | ||||||
|  | 
 | ||||||
|  |     # create a the tmp file and tell the parent where it's at | ||||||
|  |     assert not tmp_file.is_file() | ||||||
|  |     tmp_file.touch() | ||||||
|  | 
 | ||||||
|  |     stack: ExitStack = current_actor().lifetime_stack | ||||||
|  |     stack.callback(tmp_file.unlink) | ||||||
|  | 
 | ||||||
|  |     await ctx.started(( | ||||||
|  |         str(tmp_file), | ||||||
|  |         os.getpid(), | ||||||
|  |     )) | ||||||
|  | 
 | ||||||
|  |     # expect to be cancelled from here! | ||||||
|  |     try: | ||||||
|  | 
 | ||||||
|  |         # NOTE: turns out you don't even need to sched an aio task | ||||||
|  |         # since the original issue, even though seemingly was due to | ||||||
|  |         # the guest-run being abandoned + a `._debug.pause()` inside | ||||||
|  |         # `._runtime._async_main()` (which was originally trying to | ||||||
|  |         # debug the `.lifetime_stack` not closing), IS NOT actually | ||||||
|  |         # the core issue? | ||||||
|  |         # | ||||||
|  |         # further notes: | ||||||
|  |         # | ||||||
|  |         # - `trio` only issues the " RuntimeWarning: Trio guest run | ||||||
|  |         #   got abandoned without properly finishing... weird stuff | ||||||
|  |         #   might happen" IFF you DO run a asyncio task here, BUT | ||||||
|  |         # - the original issue of the `.lifetime_stack` not closing | ||||||
|  |         #   will still happen even if you don't run an `asyncio` task | ||||||
|  |         #   here even though the "abandon" messgage won't be shown.. | ||||||
|  |         # | ||||||
|  |         # => ????? honestly i'm lost but it seems to be some issue | ||||||
|  |         #   with `asyncio` and SIGINT.. | ||||||
|  |         # | ||||||
|  |         # XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and | ||||||
|  |         # `run_as_asyncio_guest()` is written WITHOUT THE | ||||||
|  |         # `.cancel_soon()` soln, both of these tests will pass ?? | ||||||
|  |         # so maybe it has something to do with `asyncio` loop init | ||||||
|  |         # state?!? | ||||||
|  |         # honestly, this REALLY reminds me why i haven't used | ||||||
|  |         # `asyncio` by choice in years.. XD | ||||||
|  |         # | ||||||
|  |         # await tractor.to_asyncio.run_task(aio_sleep_forever) | ||||||
|  |         if bg_aio_task: | ||||||
|  |             async with trio.open_nursery() as tn: | ||||||
|  |                 tn.start_soon( | ||||||
|  |                     tractor.to_asyncio.run_task, | ||||||
|  |                     aio_sleep_forever, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |         await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |     # signalled manually at the OS level (aka KBI) by the parent actor. | ||||||
|  |     except KeyboardInterrupt: | ||||||
|  |         print('child raised KBI..') | ||||||
|  |         assert tmp_file.exists() | ||||||
|  |         raise | ||||||
|  |     else: | ||||||
|  |         raise RuntimeError('shoulda received a KBI?') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @pytest.mark.parametrize( | ||||||
|  |     'bg_aio_task', | ||||||
|  |     [ | ||||||
|  |         False, | ||||||
|  | 
 | ||||||
|  |         # NOTE: (and see notes in `manage_file()` above as well) if | ||||||
|  |         # we FOR SURE SPAWN AN AIO TASK in the child it seems the | ||||||
|  |         # "silent-abandon" case (as is described in detail in | ||||||
|  |         # `to_asyncio.run_as_asyncio_guest()`) does not happen and | ||||||
|  |         # `asyncio`'s loop will at least abandon the `trio` side | ||||||
|  |         # loudly? .. prolly the state-spot to start looking for | ||||||
|  |         # a soln that results in NO ABANDONMENT.. XD | ||||||
|  |         True, | ||||||
|  |     ], | ||||||
|  |     ids=[ | ||||||
|  |         'bg_aio_task', | ||||||
|  |         'just_trio_slee', | ||||||
|  |     ], | ||||||
|  | ) | ||||||
|  | @pytest.mark.parametrize( | ||||||
|  |     'wait_for_ctx', | ||||||
|  |     [ | ||||||
|  |         False, | ||||||
|  |         True, | ||||||
|  |     ], | ||||||
|  |     ids=[ | ||||||
|  |         'raise_KBI_in_rent', | ||||||
|  |         'wait_for_ctx', | ||||||
|  |     ], | ||||||
|  | ) | ||||||
|  | def test_sigint_closes_lifetime_stack( | ||||||
|  |     tmp_path: Path, | ||||||
|  |     wait_for_ctx: bool, | ||||||
|  |     bg_aio_task: bool, | ||||||
|  | ): | ||||||
|  |     ''' | ||||||
|  |     Ensure that an infected child can use the `Actor.lifetime_stack` | ||||||
|  |     to make a file on boot and it's automatically cleaned up by the | ||||||
|  |     actor-lifetime-linked exit stack closure. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     async def main(): | ||||||
|  |         try: | ||||||
|  |             async with tractor.open_nursery() as n: | ||||||
|  |                 p = await n.start_actor( | ||||||
|  |                     'file_mngr', | ||||||
|  |                     enable_modules=[__name__], | ||||||
|  |                     infect_asyncio=True, | ||||||
|  |                 ) | ||||||
|  |                 async with p.open_context( | ||||||
|  |                     manage_file, | ||||||
|  |                     tmp_path_str=str(tmp_path), | ||||||
|  |                     bg_aio_task=bg_aio_task, | ||||||
|  |                 ) as (ctx, first): | ||||||
|  | 
 | ||||||
|  |                     path_str, cpid = first | ||||||
|  |                     tmp_file: Path = Path(path_str) | ||||||
|  |                     assert tmp_file.exists() | ||||||
|  | 
 | ||||||
|  |                     # XXX originally to simulate what (hopefully) | ||||||
|  |                     # the below now triggers.. had to manually | ||||||
|  |                     # trigger a SIGINT from a ctl-c in the root. | ||||||
|  |                     # await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |                     # XXX NOTE XXX signal infected-`asyncio` child to | ||||||
|  |                     # OS-cancel with SIGINT; this should trigger the | ||||||
|  |                     # bad `asyncio` cancel behaviour that can cause | ||||||
|  |                     # a guest-run abandon as was seen causing | ||||||
|  |                     # shm-buffer leaks in `piker`'s live quote stream | ||||||
|  |                     # susbys! | ||||||
|  |                     # | ||||||
|  |                     # await trio.sleep(.5) | ||||||
|  |                     await trio.sleep(.2) | ||||||
|  |                     os.kill( | ||||||
|  |                         cpid, | ||||||
|  |                         signal.SIGINT, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                     # XXX CASE 1: without the bug fixed, in | ||||||
|  |                     # the non-KBI-raised-in-parent case, this | ||||||
|  |                     # timeout should trigger! | ||||||
|  |                     if wait_for_ctx: | ||||||
|  |                         print('waiting for ctx outcome in parent..') | ||||||
|  |                         try: | ||||||
|  |                             with trio.fail_after(.7): | ||||||
|  |                                 await ctx.wait_for_result() | ||||||
|  |                         except tractor.ContextCancelled as ctxc: | ||||||
|  |                             assert ctxc.canceller == ctx.chan.uid | ||||||
|  |                             raise | ||||||
|  | 
 | ||||||
|  |                     # XXX CASE 2: this seems to be the source of the | ||||||
|  |                     # original issue which exhibited BEFORE we put | ||||||
|  |                     # a `Actor.cancel_soon()` inside | ||||||
|  |                     # `run_as_asyncio_guest()`.. | ||||||
|  |                     else: | ||||||
|  |                         raise KeyboardInterrupt | ||||||
|  | 
 | ||||||
|  |                 pytest.fail('should have raised some kinda error?!?') | ||||||
|  | 
 | ||||||
|  |         except ( | ||||||
|  |             KeyboardInterrupt, | ||||||
|  |             ContextCancelled, | ||||||
|  |         ): | ||||||
|  |             # XXX CASE 2: without the bug fixed, in the | ||||||
|  |             # KBI-raised-in-parent case, the actor teardown should | ||||||
|  |             # never get run (silently abaondoned by `asyncio`..) and | ||||||
|  |             # thus the file should leak! | ||||||
|  |             assert not tmp_file.exists() | ||||||
|  |             assert ctx.maybe_error | ||||||
|  | 
 | ||||||
|  |     trio.run(main) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: debug_mode tests once we get support for `asyncio`! | # TODO: debug_mode tests once we get support for `asyncio`! | ||||||
| # | # | ||||||
| # -[ ] need tests to wrap both scripts: | # -[ ] need tests to wrap both scripts: | ||||||
|  |  | ||||||
|  | @ -922,15 +922,6 @@ class NoRuntime(RuntimeError): | ||||||
|     "The root actor has not been initialized yet" |     "The root actor has not been initialized yet" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| class AsyncioCancelled(Exception): |  | ||||||
|     ''' |  | ||||||
|     Asyncio cancelled translation (non-base) error |  | ||||||
|     for use with the ``to_asyncio`` module |  | ||||||
|     to be raised in the ``trio`` side task |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
| 
 |  | ||||||
| class MessagingError(Exception): | class MessagingError(Exception): | ||||||
|     ''' |     ''' | ||||||
|     IPC related msg (typing), transaction (ordering) or dialog |     IPC related msg (typing), transaction (ordering) or dialog | ||||||
|  | @ -1324,7 +1315,9 @@ def _mk_recv_mte( | ||||||
|         any_pld: Any = msgpack.decode(msg.pld) |         any_pld: Any = msgpack.decode(msg.pld) | ||||||
|         message: str = ( |         message: str = ( | ||||||
|             f'invalid `{msg_type.__qualname__}` msg payload\n\n' |             f'invalid `{msg_type.__qualname__}` msg payload\n\n' | ||||||
|             f'value: `{any_pld!r}` does not match type-spec: ' |             f'{any_pld!r}\n\n' | ||||||
|  |             f'has type {type(any_pld)!r}\n\n' | ||||||
|  |             f'and does not match type-spec ' | ||||||
|             f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' |             f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' | ||||||
|         ) |         ) | ||||||
|         bad_msg = msg |         bad_msg = msg | ||||||
|  |  | ||||||
|  | @ -18,11 +18,13 @@ | ||||||
| Infection apis for ``asyncio`` loops running ``trio`` using guest mode. | Infection apis for ``asyncio`` loops running ``trio`` using guest mode. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
|  | from __future__ import annotations | ||||||
| import asyncio | import asyncio | ||||||
| from asyncio.exceptions import CancelledError | from asyncio.exceptions import CancelledError | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
| import inspect | import inspect | ||||||
|  | import traceback | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|  | @ -30,20 +32,21 @@ from typing import ( | ||||||
|     Awaitable, |     Awaitable, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import trio | import tractor | ||||||
| from outcome import Error |  | ||||||
| 
 |  | ||||||
| from tractor.log import get_logger |  | ||||||
| from tractor._state import ( | from tractor._state import ( | ||||||
|     current_actor, |  | ||||||
|     debug_mode, |     debug_mode, | ||||||
| ) | ) | ||||||
|  | from tractor.log import get_logger | ||||||
| from tractor.devx import _debug | from tractor.devx import _debug | ||||||
| from tractor._exceptions import AsyncioCancelled |  | ||||||
| from tractor.trionics._broadcast import ( | from tractor.trionics._broadcast import ( | ||||||
|     broadcast_receiver, |     broadcast_receiver, | ||||||
|     BroadcastReceiver, |     BroadcastReceiver, | ||||||
| ) | ) | ||||||
|  | import trio | ||||||
|  | from outcome import ( | ||||||
|  |     Error, | ||||||
|  |     Outcome, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -161,7 +164,7 @@ def _run_asyncio_task( | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     __tracebackhide__ = True |     __tracebackhide__ = True | ||||||
|     if not current_actor().is_infected_aio(): |     if not tractor.current_actor().is_infected_aio(): | ||||||
|         raise RuntimeError( |         raise RuntimeError( | ||||||
|             "`infect_asyncio` mode is not enabled!?" |             "`infect_asyncio` mode is not enabled!?" | ||||||
|         ) |         ) | ||||||
|  | @ -172,7 +175,6 @@ def _run_asyncio_task( | ||||||
|     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore |     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore | ||||||
| 
 | 
 | ||||||
|     args = tuple(inspect.getfullargspec(func).args) |     args = tuple(inspect.getfullargspec(func).args) | ||||||
| 
 |  | ||||||
|     if getattr(func, '_tractor_steam_function', None): |     if getattr(func, '_tractor_steam_function', None): | ||||||
|         # the assumption is that the target async routine accepts the |         # the assumption is that the target async routine accepts the | ||||||
|         # send channel then it intends to yield more then one return |         # send channel then it intends to yield more then one return | ||||||
|  | @ -346,13 +348,22 @@ def _run_asyncio_task( | ||||||
|             # on a checkpoint. |             # on a checkpoint. | ||||||
|             cancel_scope.cancel() |             cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|             # raise any ``asyncio`` side error. |             # raise any `asyncio` side error. | ||||||
|             raise aio_err |             raise aio_err | ||||||
| 
 | 
 | ||||||
|     task.add_done_callback(cancel_trio) |     task.add_done_callback(cancel_trio) | ||||||
|     return chan |     return chan | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | class AsyncioCancelled(CancelledError): | ||||||
|  |     ''' | ||||||
|  |     Asyncio cancelled translation (non-base) error | ||||||
|  |     for use with the ``to_asyncio`` module | ||||||
|  |     to be raised in the ``trio`` side task | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @acm | @acm | ||||||
| async def translate_aio_errors( | async def translate_aio_errors( | ||||||
| 
 | 
 | ||||||
|  | @ -516,7 +527,6 @@ async def open_channel_from( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def run_as_asyncio_guest( | def run_as_asyncio_guest( | ||||||
| 
 |  | ||||||
|     trio_main: Callable, |     trio_main: Callable, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|  | @ -548,6 +558,11 @@ def run_as_asyncio_guest( | ||||||
| 
 | 
 | ||||||
|         loop = asyncio.get_running_loop() |         loop = asyncio.get_running_loop() | ||||||
|         trio_done_fut = asyncio.Future() |         trio_done_fut = asyncio.Future() | ||||||
|  |         startup_msg: str = ( | ||||||
|  |             'Starting `asyncio` guest-loop-run\n' | ||||||
|  |             '-> got running loop\n' | ||||||
|  |             '-> built a `trio`-done future\n' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         if debug_mode(): |         if debug_mode(): | ||||||
|             # XXX make it obvi we know this isn't supported yet! |             # XXX make it obvi we know this isn't supported yet! | ||||||
|  | @ -562,34 +577,120 @@ def run_as_asyncio_guest( | ||||||
|         def trio_done_callback(main_outcome): |         def trio_done_callback(main_outcome): | ||||||
| 
 | 
 | ||||||
|             if isinstance(main_outcome, Error): |             if isinstance(main_outcome, Error): | ||||||
|                 error = main_outcome.error |                 error: BaseException = main_outcome.error | ||||||
|  | 
 | ||||||
|  |                 # show an dedicated `asyncio`-side tb from the error | ||||||
|  |                 tb_str: str = ''.join(traceback.format_exception(error)) | ||||||
|  |                 log.exception( | ||||||
|  |                     'Guest-run errored!?\n\n' | ||||||
|  |                     f'{main_outcome}\n' | ||||||
|  |                     f'{error}\n\n' | ||||||
|  |                     f'{tb_str}\n' | ||||||
|  |                 ) | ||||||
|                 trio_done_fut.set_exception(error) |                 trio_done_fut.set_exception(error) | ||||||
| 
 | 
 | ||||||
|                 # TODO: explicit asyncio tb? |                 # raise inline | ||||||
|                 # traceback.print_exception(error) |  | ||||||
| 
 |  | ||||||
|                 # XXX: do we need this? |  | ||||||
|                 # actor.cancel_soon() |  | ||||||
| 
 |  | ||||||
|                 main_outcome.unwrap() |                 main_outcome.unwrap() | ||||||
|  | 
 | ||||||
|             else: |             else: | ||||||
|                 trio_done_fut.set_result(main_outcome) |                 trio_done_fut.set_result(main_outcome) | ||||||
|                 log.runtime(f"trio_main finished: {main_outcome!r}") |                 log.runtime(f'trio_main finished: {main_outcome!r}') | ||||||
|  | 
 | ||||||
|  |         startup_msg += ( | ||||||
|  |             f'-> created {trio_done_callback!r}\n' | ||||||
|  |             f'-> scheduling `trio_main`: {trio_main!r}\n' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         # start the infection: run trio on the asyncio loop in "guest mode" |         # start the infection: run trio on the asyncio loop in "guest mode" | ||||||
|         log.runtime( |         log.runtime( | ||||||
|             'Infecting `asyncio`-process with a `trio` guest-run of\n\n' |             f'{startup_msg}\n\n' | ||||||
|             f'{trio_main!r}\n\n' |             + | ||||||
| 
 |             'Infecting `asyncio`-process with a `trio` guest-run!\n' | ||||||
|             f'{trio_done_callback}\n' |  | ||||||
|         ) |         ) | ||||||
|  | 
 | ||||||
|         trio.lowlevel.start_guest_run( |         trio.lowlevel.start_guest_run( | ||||||
|             trio_main, |             trio_main, | ||||||
|             run_sync_soon_threadsafe=loop.call_soon_threadsafe, |             run_sync_soon_threadsafe=loop.call_soon_threadsafe, | ||||||
|             done_callback=trio_done_callback, |             done_callback=trio_done_callback, | ||||||
|         ) |         ) | ||||||
|         # NOTE `.unwrap()` will raise on error |         try: | ||||||
|         return (await trio_done_fut).unwrap() |             # TODO: better SIGINT handling since shielding seems to | ||||||
|  |             # make NO DIFFERENCE XD | ||||||
|  |             # -[ ] maybe this is due to 3.11's recent SIGINT handling | ||||||
|  |             #  changes and we can better work with/around it? | ||||||
|  |             # https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption | ||||||
|  |             out: Outcome = await asyncio.shield(trio_done_fut) | ||||||
|  |             # NOTE `Error.unwrap()` will raise | ||||||
|  |             return out.unwrap() | ||||||
|  | 
 | ||||||
|  |         except asyncio.CancelledError: | ||||||
|  |             actor: tractor.Actor = tractor.current_actor() | ||||||
|  |             log.exception( | ||||||
|  |                 '`asyncio`-side main task was cancelled!\n' | ||||||
|  |                 'Cancelling actor-runtime..\n' | ||||||
|  |                 f'c)>\n' | ||||||
|  |                 f'  |_{actor}.cancel_soon()\n' | ||||||
|  | 
 | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             # XXX NOTE XXX the next LOC is super important!!! | ||||||
|  |             #  => without it, we can get a guest-run abandonment case | ||||||
|  |             #  where asyncio will not trigger `trio` in a final event | ||||||
|  |             #  loop cycle! | ||||||
|  |             # | ||||||
|  |             #  our test, | ||||||
|  |             #  `test_infected_asyncio.test_sigint_closes_lifetime_stack()` | ||||||
|  |             #  demonstrates how if when we raise a SIGINT-signal in an infected | ||||||
|  |             #  child we get a variable race condition outcome where | ||||||
|  |             #  either of the following can indeterminately happen, | ||||||
|  |             # | ||||||
|  |             # - "silent-abandon": `asyncio` abandons the `trio` | ||||||
|  |             #   guest-run task silently and no `trio`-guest-run or | ||||||
|  |             #   `tractor`-actor-runtime teardown happens whatsoever.. | ||||||
|  |             #   this is the WORST (race) case outcome. | ||||||
|  |             # | ||||||
|  |             # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with | ||||||
|  |             #   `trio` reporting a console traceback and further tbs of all | ||||||
|  |             #   the failed shutdown routines also show on console.. | ||||||
|  |             # | ||||||
|  |             # our test can thus fail and (has been parametrized for) | ||||||
|  |             # the 2 cases: | ||||||
|  |             # | ||||||
|  |             # - when the parent raises a KBI just after | ||||||
|  |             #   signalling the child, | ||||||
|  |             #  |_silent-abandon => the `Actor.lifetime_stack` will | ||||||
|  |             #    never be closed thus leaking a resource! | ||||||
|  |             #   -> FAIL! | ||||||
|  |             #  |_loud-abandon => despite the abandonment at least the | ||||||
|  |             #    stack will be closed out.. | ||||||
|  |             #   -> PASS | ||||||
|  |             # | ||||||
|  |             # - when the parent instead simply waits on `ctx.wait_for_result()` | ||||||
|  |             #   (i.e. DOES not raise a KBI itself), | ||||||
|  |             #  |_silent-abandon => test will just hang and thus the ctx | ||||||
|  |             #    and actor will never be closed/cancelled/shutdown | ||||||
|  |             #    resulting in leaking a (file) resource since the | ||||||
|  |             #    `trio`/`tractor` runtime never relays a ctxc back to | ||||||
|  |             #    the parent; the test's timeout will trigger.. | ||||||
|  |             #   -> FAIL! | ||||||
|  |             #  |_loud-abandon => this case seems to never happen?? | ||||||
|  |             # | ||||||
|  |             # XXX FIRST PART XXX, SO, this is a fix to the | ||||||
|  |             # "silent-abandon" case, NOT the `trio`-guest-run | ||||||
|  |             # abandonment issue in general, for which the NEXT LOC | ||||||
|  |             # is apparently a working fix! | ||||||
|  |             actor.cancel_soon() | ||||||
|  | 
 | ||||||
|  |             # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to | ||||||
|  |             # `trio`-guest-run to complete and teardown !! | ||||||
|  |             # | ||||||
|  |             # XXX WITHOUT THIS the guest-run gets race-conditionally | ||||||
|  |             # abandoned by `asyncio`!! | ||||||
|  |             # XD XD XD | ||||||
|  |             await asyncio.shield( | ||||||
|  |                 asyncio.sleep(.1)  # NOPE! it can't be 0 either XD | ||||||
|  |             ) | ||||||
|  |             raise | ||||||
| 
 | 
 | ||||||
|     # might as well if it's installed. |     # might as well if it's installed. | ||||||
|     try: |     try: | ||||||
|  | @ -599,4 +700,6 @@ def run_as_asyncio_guest( | ||||||
|     except ImportError: |     except ImportError: | ||||||
|         pass |         pass | ||||||
| 
 | 
 | ||||||
|     return asyncio.run(aio_main(trio_main)) |     return asyncio.run( | ||||||
|  |         aio_main(trio_main), | ||||||
|  |     ) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue