From fb026e37473f679777a52e883e9a5ef932242f8e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Oct 2021 23:14:34 -0400 Subject: [PATCH] First draft: `.to_asyncio.open_channel_from()` --- tractor/to_asyncio.py | 181 ++++++++++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 68 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index aa8c6fc..936ab3f 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -3,17 +3,19 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' import asyncio +from contextlib import asynccontextmanager as acm import inspect from typing import ( Any, Callable, AsyncIterator, Awaitable, + Optional, ) import trio -from .log import get_logger +from .log import get_logger, get_console_log from ._state import current_actor log = get_logger(__name__) @@ -22,36 +24,30 @@ log = get_logger(__name__) __all__ = ['run_task', 'run_as_asyncio_guest'] -async def run_coro( - to_trio: trio.MemorySendChannel, - coro: Awaitable, -) -> None: - """Await ``coro`` and relay result back to ``trio``. - """ - to_trio.send_nowait(await coro) +# async def consume_asyncgen( +# to_trio: trio.MemorySendChannel, +# coro: AsyncIterator, +# ) -> None: +# """Stream async generator results back to ``trio``. - -async def consume_asyncgen( - to_trio: trio.MemorySendChannel, - coro: AsyncIterator, -) -> None: - """Stream async generator results back to ``trio``. - - ``from_trio`` might eventually be used here for - bidirectional streaming. - """ - async for item in coro: - to_trio.send_nowait(item) +# ``from_trio`` might eventually be used here for +# bidirectional streaming. +# """ +# async for item in coro: +# to_trio.send_nowait(item) def _run_asyncio_task( func: Callable, *, qsize: int = 1, - _treat_as_stream: bool = False, + # _treat_as_stream: bool = False, + provide_channels: bool = False, **kwargs, + ) -> Any: - """Run an ``asyncio`` async function or generator in a task, return + """ + Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ @@ -73,6 +69,9 @@ def _run_asyncio_task( # _treat_as_stream = True assert qsize > 1 + if provide_channels: + assert 'to_trio' in args + # allow target func to accept/stream results manually by name if 'to_trio' in args: kwargs['to_trio'] = to_trio @@ -80,25 +79,46 @@ def _run_asyncio_task( if 'from_trio' in args: kwargs['from_trio'] = from_trio - # if 'from_aio' in args: - # kwargs['from_aio'] = from_aio - coro = func(**kwargs) cancel_scope = trio.CancelScope() + aio_task_complete = trio.Event() + aio_err: Optional[BaseException] = None + + async def wait_on_coro_final_result( + to_trio: trio.MemorySendChannel, + coro: Awaitable, + aio_task_complete: trio.Event, + + ) -> None: + """ + Await ``coro`` and relay result back to ``trio``. + + """ + nonlocal aio_err + orig = result = id(coro) + try: + result = await coro + except BaseException as err: + aio_err = err + from_aio._err = aio_err + finally: + aio_task_complete.set() + if result != orig and aio_err is None: + to_trio.send_nowait(result) # start the asyncio task we submitted from trio if inspect.isawaitable(coro): - task = asyncio.create_task(run_coro(to_trio, coro)) + task = asyncio.create_task( + wait_on_coro_final_result(to_trio, coro, aio_task_complete) + ) - elif inspect.isasyncgen(coro): - task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + # elif inspect.isasyncgen(coro): + # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) else: raise TypeError(f"No support for invoking {coro}") - aio_err = None - def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ @@ -110,23 +130,21 @@ def _run_asyncio_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - - cancel_scope.cancel() - from_aio._err = aio_err - from_aio.close() + from_aio._err = aio_err + cancel_scope.cancel() + from_aio.close() task.add_done_callback(cancel_trio) - return task, from_aio, to_trio, cancel_scope + return task, from_aio, to_trio, cancel_scope, aio_task_complete async def run_task( - func: Callable, *, qsize: int = 2**10, - _treat_as_stream: bool = False, + # _treat_as_stream: bool = False, **kwargs, ) -> Any: @@ -134,22 +152,9 @@ async def run_task( or stream the result back to ``trio``. """ - # streaming ``asyncio`` task - if _treat_as_stream: - - task, from_aio, to_trio, cs = _run_asyncio_task( - func, - qsize=2**8, - **kwargs, - ) - - # naively expect the mem chan api to do the job - # of handling cross-framework cancellations / errors - return from_aio - # simple async func try: - task, from_aio, to_trio, cs = _run_asyncio_task( + task, from_aio, to_trio, cs, _ = _run_asyncio_task( func, qsize=1, **kwargs, @@ -157,6 +162,8 @@ async def run_task( # return single value with cs: + # naively expect the mem chan api to do the job + # of handling cross-framework cancellations / errors return await from_aio.receive() if cs.cancelled_caught: @@ -165,7 +172,7 @@ async def run_task( raise from_aio._err # Do we need this? - except Exception as err: + except BaseException as err: aio_err = from_aio._err @@ -178,8 +185,8 @@ async def run_task( # except trio.Cancelled: # raise finally: - # if not task.done(): - task.cancel() + if not task.done(): + task.cancel() # TODO: explicit api for the streaming case where @@ -187,21 +194,54 @@ async def run_task( # This ends up looking more like our ``Portal.open_stream_from()`` # NB: code below is untested. -# @asynccontextmanager -# async def stream_from_task( +# async def _start_and_sync_aio_task( +# from_trio, +# to_trio, +# from_aio, -# target: Callable[Any], -# **kwargs, -# ) -> AsyncIterator[Any]: +@acm +async def open_channel_from( -# from_aoi = await run_task(target, _treat_as_stream=True, **kwargs) + target: Callable[[Any, ...], Any], + **kwargs, -# with cancel_scope: -# # stream values upward -# async with from_aio: -# async for item in from_aio: -# yield item +) -> AsyncIterator[Any]: + + try: + task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task( + target, + qsize=2**8, + provide_channels=True, + **kwargs, + ) + + with cs: + # sync to "started()" call. + first = await from_aio.receive() + # stream values upward + async with from_aio: + yield first, from_aio + # await aio_task_complete.wait() + + except BaseException as err: + + aio_err = from_aio._err + + if aio_err is not None: + # always raise from any captured asyncio error + raise err from aio_err + else: + raise + + finally: + if cs.cancelled_caught: + # always raise from any captured asyncio error + if from_aio._err: + raise from_aio._err + + if not task.done(): + task.cancel() def run_as_asyncio_guest( @@ -221,16 +261,22 @@ def run_as_asyncio_guest( it. :) + """ + # Disable sigint handling in children? + # import signal + # signal.signal(signal.SIGINT, signal.SIG_IGN) + + get_console_log('runtime') + async def aio_main(trio_main): loop = asyncio.get_running_loop() - trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): - log.info(f"trio_main finished: {main_outcome!r}") + print(f"trio_main finished: {main_outcome!r}") trio_done_fut.set_result(main_outcome) # start the infection: run trio on the asyncio loop in "guest mode" @@ -241,7 +287,6 @@ def run_as_asyncio_guest( run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) - (await trio_done_fut).unwrap() # might as well if it's installed.