WIP redo asyncio async gen streaming
							parent
							
								
									e8431bffd0
								
							
						
					
					
						commit
						de87cb510a
					
				|  | @ -238,7 +238,7 @@ def run( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def run_daemon( | def run_daemon( | ||||||
|     rpc_module_paths: List[str], |     enable_modules: List[str], | ||||||
|     **kwargs |     **kwargs | ||||||
| ) -> None: | ) -> None: | ||||||
|     """Spawn daemon actor which will respond to RPC. |     """Spawn daemon actor which will respond to RPC. | ||||||
|  | @ -247,9 +247,9 @@ def run_daemon( | ||||||
|     ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned |     ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned | ||||||
|     is meant to run forever responding to RPC requests. |     is meant to run forever responding to RPC requests. | ||||||
|     """ |     """ | ||||||
|     kwargs['rpc_module_paths'] = list(rpc_module_paths) |     kwargs['enable_modules'] = list(enable_modules) | ||||||
| 
 | 
 | ||||||
|     for path in rpc_module_paths: |     for path in enable_modules: | ||||||
|         importlib.import_module(path) |         importlib.import_module(path) | ||||||
| 
 | 
 | ||||||
|     return run(partial(trio.sleep, float('inf')), **kwargs) |     return run(partial(trio.sleep, float('inf')), **kwargs) | ||||||
|  |  | ||||||
|  | @ -43,15 +43,16 @@ async def consume_asyncgen( | ||||||
|         to_trio.send_nowait(item) |         to_trio.send_nowait(item) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def run_task( | def _run_asyncio_task( | ||||||
|     func: Callable, |     func: Callable, | ||||||
|     *, |     *, | ||||||
|     qsize: int = 2**10, |     qsize: int = 1, | ||||||
|     _treat_as_stream: bool = False, |     _treat_as_stream: bool = False, | ||||||
|     **kwargs, |     **kwargs, | ||||||
| ) -> Any: | ) -> Any: | ||||||
|     """Run an ``asyncio`` async function or generator in a task, return |     """Run an ``asyncio`` async function or generator in a task, return | ||||||
|     or stream the result back to ``trio``. |     or stream the result back to ``trio``. | ||||||
|  | 
 | ||||||
|     """ |     """ | ||||||
|     assert current_actor().is_infected_aio() |     assert current_actor().is_infected_aio() | ||||||
| 
 | 
 | ||||||
|  | @ -59,29 +60,38 @@ async def run_task( | ||||||
|     from_trio = asyncio.Queue(qsize)  # type: ignore |     from_trio = asyncio.Queue(qsize)  # type: ignore | ||||||
|     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore |     to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore | ||||||
| 
 | 
 | ||||||
|  |     from_aio._err = None | ||||||
|  | 
 | ||||||
|     args = tuple(inspect.getfullargspec(func).args) |     args = tuple(inspect.getfullargspec(func).args) | ||||||
| 
 | 
 | ||||||
|     if getattr(func, '_tractor_steam_function', None): |     if getattr(func, '_tractor_steam_function', None): | ||||||
|         # the assumption is that the target async routine accepts the |         # the assumption is that the target async routine accepts the | ||||||
|         # send channel then it intends to yield more then one return |         # send channel then it intends to yield more then one return | ||||||
|         # value otherwise it would just return ;P |         # value otherwise it would just return ;P | ||||||
|         _treat_as_stream = True |         # _treat_as_stream = True | ||||||
|  |         assert qsize > 1 | ||||||
| 
 | 
 | ||||||
|     # allow target func to accept/stream results manually by name |     # allow target func to accept/stream results manually by name | ||||||
|     if 'to_trio' in args: |     if 'to_trio' in args: | ||||||
|         kwargs['to_trio'] = to_trio |         kwargs['to_trio'] = to_trio | ||||||
|  | 
 | ||||||
|     if 'from_trio' in args: |     if 'from_trio' in args: | ||||||
|         kwargs['from_trio'] = from_trio |         kwargs['from_trio'] = from_trio | ||||||
| 
 | 
 | ||||||
|  |     # if 'from_aio' in args: | ||||||
|  |     #     kwargs['from_aio'] = from_aio | ||||||
|  | 
 | ||||||
|     coro = func(**kwargs) |     coro = func(**kwargs) | ||||||
| 
 | 
 | ||||||
|     cancel_scope = trio.CancelScope() |     # cancel_scope = trio.CancelScope() | ||||||
| 
 | 
 | ||||||
|     # start the asyncio task we submitted from trio |     # start the asyncio task we submitted from trio | ||||||
|     if inspect.isawaitable(coro): |     if inspect.isawaitable(coro): | ||||||
|         task = asyncio.create_task(run_coro(to_trio, coro)) |         task = asyncio.create_task(run_coro(to_trio, coro)) | ||||||
|  | 
 | ||||||
|     elif inspect.isasyncgen(coro): |     elif inspect.isasyncgen(coro): | ||||||
|         task = asyncio.create_task(consume_asyncgen(to_trio, coro)) |         task = asyncio.create_task(consume_asyncgen(to_trio, coro)) | ||||||
|  | 
 | ||||||
|     else: |     else: | ||||||
|         raise TypeError(f"No support for invoking {coro}") |         raise TypeError(f"No support for invoking {coro}") | ||||||
| 
 | 
 | ||||||
|  | @ -96,54 +106,149 @@ async def run_task( | ||||||
|         if aio_err: |         if aio_err: | ||||||
|             log.exception(f"asyncio task errorred:\n{aio_err}") |             log.exception(f"asyncio task errorred:\n{aio_err}") | ||||||
| 
 | 
 | ||||||
|         cancel_scope.cancel() |         # cancel_scope.cancel() | ||||||
|  |         from_aio._err = aio_err | ||||||
|  |         to_trio.close() | ||||||
| 
 | 
 | ||||||
|     task.add_done_callback(cancel_trio) |     task.add_done_callback(cancel_trio) | ||||||
| 
 | 
 | ||||||
|  |     return task, from_aio, to_trio | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def run_task( | ||||||
|  |     func: Callable, | ||||||
|  |     *, | ||||||
|  |     qsize: int = 2**10, | ||||||
|  |     _treat_as_stream: bool = False, | ||||||
|  |     **kwargs, | ||||||
|  | ) -> Any: | ||||||
|  |     """Run an ``asyncio`` async function or generator in a task, return | ||||||
|  |     or stream the result back to ``trio``. | ||||||
|  | 
 | ||||||
|  |     """ | ||||||
|  |     # assert current_actor().is_infected_aio() | ||||||
|  | 
 | ||||||
|  |     # # ITC (inter task comms) | ||||||
|  |     # from_trio = asyncio.Queue(qsize)  # type: ignore | ||||||
|  |     # to_trio, from_aio = trio.open_memory_channel(qsize)  # type: ignore | ||||||
|  | 
 | ||||||
|  |     # args = tuple(inspect.getfullargspec(func).args) | ||||||
|  | 
 | ||||||
|  |     # if getattr(func, '_tractor_steam_function', None): | ||||||
|  |     #     # the assumption is that the target async routine accepts the | ||||||
|  |     #     # send channel then it intends to yield more then one return | ||||||
|  |     #     # value otherwise it would just return ;P | ||||||
|  |     #     _treat_as_stream = True | ||||||
|  | 
 | ||||||
|  |     # # allow target func to accept/stream results manually by name | ||||||
|  |     # if 'to_trio' in args: | ||||||
|  |     #     kwargs['to_trio'] = to_trio | ||||||
|  |     # if 'from_trio' in args: | ||||||
|  |     #     kwargs['from_trio'] = from_trio | ||||||
|  | 
 | ||||||
|  |     # coro = func(**kwargs) | ||||||
|  | 
 | ||||||
|  |     # cancel_scope = trio.CancelScope() | ||||||
|  | 
 | ||||||
|  |     # # start the asyncio task we submitted from trio | ||||||
|  |     # if inspect.isawaitable(coro): | ||||||
|  |     #     task = asyncio.create_task(run_coro(to_trio, coro)) | ||||||
|  | 
 | ||||||
|  |     # elif inspect.isasyncgen(coro): | ||||||
|  |     #     task = asyncio.create_task(consume_asyncgen(to_trio, coro)) | ||||||
|  | 
 | ||||||
|  |     # else: | ||||||
|  |     #     raise TypeError(f"No support for invoking {coro}") | ||||||
|  | 
 | ||||||
|  |     # aio_err = None | ||||||
|  | 
 | ||||||
|  |     # def cancel_trio(task): | ||||||
|  |     #     """Cancel the calling ``trio`` task on error. | ||||||
|  |     #     """ | ||||||
|  |     #     nonlocal aio_err | ||||||
|  |     #     aio_err = task.exception() | ||||||
|  | 
 | ||||||
|  |     #     if aio_err: | ||||||
|  |     #         log.exception(f"asyncio task errorred:\n{aio_err}") | ||||||
|  | 
 | ||||||
|  |     #     cancel_scope.cancel() | ||||||
|  | 
 | ||||||
|  |     # task.add_done_callback(cancel_trio) | ||||||
|  | 
 | ||||||
|     # async iterator |     # async iterator | ||||||
|     if inspect.isasyncgen(coro) or _treat_as_stream: |     # if inspect.isasyncgen(coro) or _treat_as_stream: | ||||||
| 
 | 
 | ||||||
|         async def stream_results(): |     # if inspect.isasyncgenfunction(meth) or : | ||||||
|             try: |     if _treat_as_stream: | ||||||
|                 with cancel_scope: |  | ||||||
|                     # stream values upward |  | ||||||
|                     async with from_aio: |  | ||||||
|                         async for item in from_aio: |  | ||||||
|                             yield item |  | ||||||
| 
 | 
 | ||||||
|                 if cancel_scope.cancelled_caught: |         task, from_aio, to_trio = _run_asyncio_task( | ||||||
|                     # always raise from any captured asyncio error |             func, | ||||||
|                     if aio_err: |             qsize=2**8, | ||||||
|                         raise aio_err |             **kwargs, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|             except BaseException as err: |         return from_aio | ||||||
|                 if aio_err is not None: |  | ||||||
|                     # always raise from any captured asyncio error |  | ||||||
|                     raise err from aio_err |  | ||||||
|                 else: |  | ||||||
|                     raise |  | ||||||
| 
 | 
 | ||||||
|         return stream_results() |         # async def stream_results(): | ||||||
|  |         #     try: | ||||||
|  |         #         with cancel_scope: | ||||||
|  |         #             # stream values upward | ||||||
|  |         #             async with from_aio: | ||||||
|  |         #                 async for item in from_aio: | ||||||
|  |         #                     yield item | ||||||
|  | 
 | ||||||
|  |         #         if cancel_scope.cancelled_caught: | ||||||
|  |         #             # always raise from any captured asyncio error | ||||||
|  |         #             if aio_err: | ||||||
|  |         #                 raise aio_err | ||||||
|  | 
 | ||||||
|  |         #     except BaseException as err: | ||||||
|  |         #         if aio_err is not None: | ||||||
|  |         #             # always raise from any captured asyncio error | ||||||
|  |         #             raise err from aio_err | ||||||
|  |         #         else: | ||||||
|  |         #             raise | ||||||
|  |         #     finally: | ||||||
|  |         #         # breakpoint() | ||||||
|  |         #         task.cancel() | ||||||
|  | 
 | ||||||
|  |         # return stream_results() | ||||||
| 
 | 
 | ||||||
|     # simple async func |     # simple async func | ||||||
|     try: |     try: | ||||||
|         with cancel_scope: |         task, from_aio, to_trio = _run_asyncio_task( | ||||||
|             # return single value |             func, | ||||||
|             return await from_aio.receive() |             qsize=1, | ||||||
|  |             **kwargs, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         if cancel_scope.cancelled_caught: |         # with cancel_scope: | ||||||
|             # always raise from any captured asyncio error |         # async with from_aio: | ||||||
|             if aio_err: |             # return single value | ||||||
|                 raise aio_err |         return await from_aio.receive() | ||||||
|  | 
 | ||||||
|  |         # if cancel_scope.cancelled_caught: | ||||||
|  |         #     # always raise from any captured asyncio error | ||||||
|  |         #     if aio_err: | ||||||
|  |         #         raise aio_err | ||||||
| 
 | 
 | ||||||
|     # Do we need this? |     # Do we need this? | ||||||
|     except BaseException as err: |     except BaseException as err: | ||||||
|  |         # await tractor.breakpoint() | ||||||
|  |         aio_err = from_aio._err | ||||||
|         if aio_err is not None: |         if aio_err is not None: | ||||||
|             # always raise from any captured asyncio error |             # always raise from any captured asyncio error | ||||||
|             raise err from aio_err |             raise err from aio_err | ||||||
|         else: |         else: | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|  |     finally: | ||||||
|  |         task.cancel() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # async def stream_from_task | ||||||
|  | #     pass | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| def run_as_asyncio_guest( | def run_as_asyncio_guest( | ||||||
|     trio_main: Callable, |     trio_main: Callable, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue