From d04a754655fb8e63ccb658e9a8c811a33c9bce59 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 17:33:56 -0400 Subject: [PATCH] Drop old (and deluded) "streaming" cruft --- tractor/to_asyncio.py | 45 +++++++++---------------------------------- 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d27edc9..7b2287f 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -24,33 +24,19 @@ log = get_logger(__name__) __all__ = ['run_task', 'run_as_asyncio_guest'] -# async def consume_asyncgen( -# to_trio: trio.MemorySendChannel, -# coro: AsyncIterator, -# ) -> None: -# """Stream async generator results back to ``trio``. - -# ``from_trio`` might eventually be used here for -# bidirectional streaming. -# """ -# async for item in coro: -# to_trio.send_nowait(item) - - def _run_asyncio_task( func: Callable, *, qsize: int = 1, - # _treat_as_stream: bool = False, provide_channels: bool = False, **kwargs, ) -> Any: - """ + ''' Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. - """ + ''' if not current_actor().is_infected_aio(): raise RuntimeError("`infect_asyncio` mode is not enabled!?") @@ -66,7 +52,6 @@ def _run_asyncio_task( # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return # value otherwise it would just return ;P - # _treat_as_stream = True assert qsize > 1 if provide_channels: @@ -91,10 +76,10 @@ def _run_asyncio_task( aio_task_complete: trio.Event, ) -> None: - """ + ''' Await ``coro`` and relay result back to ``trio``. - """ + ''' nonlocal aio_err orig = result = id(coro) try: @@ -114,9 +99,6 @@ def _run_asyncio_task( wait_on_coro_final_result(to_trio, coro, aio_task_complete) ) - # elif inspect.isasyncgen(coro): - # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) - else: raise TypeError(f"No support for invoking {coro}") @@ -147,7 +129,6 @@ async def run_task( *, qsize: int = 2**10, - # _treat_as_stream: bool = False, **kwargs, ) -> Any: @@ -184,28 +165,19 @@ async def run_task( raise err from aio_err else: raise - - # except trio.Cancelled: - # raise finally: if not task.done(): task.cancel() -# TODO: explicit api for the streaming case where +# TODO: explicitly api for the streaming case where # we pull from the mem chan in an async generator? # This ends up looking more like our ``Portal.open_stream_from()`` # NB: code below is untested. -# async def _start_and_sync_aio_task( -# from_trio, -# to_trio, -# from_aio, - @acm async def open_channel_from( - target: Callable[[Any, ...], Any], **kwargs, @@ -250,7 +222,8 @@ async def open_channel_from( def run_as_asyncio_guest( trio_main: Callable, ) -> None: - """Entry for an "infected ``asyncio`` actor". + ''' + Entry for an "infected ``asyncio`` actor". Uh, oh. :o @@ -265,8 +238,8 @@ def run_as_asyncio_guest( :) - """ - # Disable sigint handling in children? + ''' + # Disable sigint handling in children? (nawp) # import signal # signal.signal(signal.SIGINT, signal.SIG_IGN)