forked from goodboy/tractor
				
			Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async func in `asyncio`.. Fix all that and remove a bunch of unnecessary func layers. Add provisional support for the target receiving the `to_trio` and `from_trio` channels and for the @tractor.stream marker.pre_bad_close
							parent
							
								
									19dec873ad
								
							
						
					
					
						commit
						eb0cff4769
					
				|  | @ -4,7 +4,6 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. | ||||||
| import asyncio | import asyncio | ||||||
| import inspect | import inspect | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |  | ||||||
|     Callable, |     Callable, | ||||||
|     AsyncGenerator, |     AsyncGenerator, | ||||||
|     Awaitable, |     Awaitable, | ||||||
|  | @ -19,46 +18,54 @@ from ._state import current_actor | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| __all__ = ['run_task'] | __all__ = ['run_task', 'run_as_asyncio_guest'] | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def _invoke( | async def _invoke( | ||||||
|     from_trio: trio.abc.ReceiveChannel, |     from_trio: trio.abc.ReceiveChannel, | ||||||
|     to_trio: asyncio.Queue, |     to_trio: asyncio.Queue, | ||||||
|     coro: Awaitable, |     coro: Awaitable, | ||||||
| ) -> Union[AsyncGenerator, Awaitable]: | ) -> None: | ||||||
|     """Await or stream awaiable object based on type into |     """Await or stream awaiable object based on ``coro`` type into | ||||||
|     ``trio`` memory channel. |     ``trio`` memory channel. | ||||||
|  | 
 | ||||||
|  |     ``from_trio`` might eventually be used here for bidirectional streaming. | ||||||
|     """ |     """ | ||||||
|     async def stream_from_gen(c): |  | ||||||
|         async for item in c: |  | ||||||
|             to_trio.send_nowait(item) |  | ||||||
| 
 |  | ||||||
|     async def just_return(c): |  | ||||||
|         to_trio.send_nowait(await c) |  | ||||||
| 
 |  | ||||||
|     if inspect.isasyncgen(coro): |     if inspect.isasyncgen(coro): | ||||||
|         return await stream_from_gen(coro) |         async for item in coro: | ||||||
|  |             to_trio.send_nowait(item) | ||||||
|     elif inspect.iscoroutine(coro): |     elif inspect.iscoroutine(coro): | ||||||
|         return await coro |         to_trio.send_nowait(await coro) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def run_task( | async def run_task( | ||||||
|     func: Callable, |     func: Callable, | ||||||
|  |     *, | ||||||
|     qsize: int = 2**10, |     qsize: int = 2**10, | ||||||
|  |     _treat_as_stream: bool = False, | ||||||
|     **kwargs, |     **kwargs, | ||||||
| ) -> Any: | ) -> Union[AsyncGenerator, Awaitable]: | ||||||
|     """Run an ``asyncio`` async function or generator in a task, return |     """Run an ``asyncio`` async function or generator in a task, return | ||||||
|     or stream the result back to ``trio``. |     or stream the result back to ``trio``. | ||||||
|     """ |     """ | ||||||
|     assert current_actor()._infected_aio |     assert current_actor().is_infected_aio() | ||||||
| 
 | 
 | ||||||
|     # ITC (inter task comms) |     # ITC (inter task comms) | ||||||
|     from_trio = asyncio.Queue(qsize) |     from_trio = asyncio.Queue(qsize) | ||||||
|     to_trio, from_aio = trio.open_memory_channel(qsize) |     to_trio, from_aio = trio.open_memory_channel(qsize) | ||||||
| 
 | 
 | ||||||
|     # allow target func to accept/stream results manually |     args = tuple(inspect.getfullargspec(func).args) | ||||||
|  | 
 | ||||||
|  |     if getattr(func, '_tractor_steam_function', None): | ||||||
|  |         # the assumption is that the target async routine accepts the | ||||||
|  |         # send channel then it intends to yield more then one return | ||||||
|  |         # value otherwise it would just return ;P | ||||||
|  |         _treat_as_stream = True | ||||||
|  | 
 | ||||||
|  |     # allow target func to accept/stream results manually by name | ||||||
|  |     if 'to_trio' in args: | ||||||
|         kwargs['to_trio'] = to_trio |         kwargs['to_trio'] = to_trio | ||||||
|  |     if 'from_trio' in args: | ||||||
|         kwargs['from_trio'] = to_trio |         kwargs['from_trio'] = to_trio | ||||||
| 
 | 
 | ||||||
|     coro = func(**kwargs) |     coro = func(**kwargs) | ||||||
|  | @ -70,7 +77,6 @@ async def run_task( | ||||||
|     task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) |     task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) | ||||||
|     err = None |     err = None | ||||||
| 
 | 
 | ||||||
|     # XXX: I'm not sure this actually does anything... |  | ||||||
|     def cancel_trio(task): |     def cancel_trio(task): | ||||||
|         """Cancel the calling ``trio`` task on error. |         """Cancel the calling ``trio`` task on error. | ||||||
|         """ |         """ | ||||||
|  | @ -80,17 +86,8 @@ async def run_task( | ||||||
| 
 | 
 | ||||||
|     task.add_done_callback(cancel_trio) |     task.add_done_callback(cancel_trio) | ||||||
| 
 | 
 | ||||||
|     # determine return type async func vs. gen |  | ||||||
|     if inspect.isasyncgen(coro): |  | ||||||
|         # simple async func |  | ||||||
|         async def result(): |  | ||||||
|             with cancel_scope: |  | ||||||
|                 return await from_aio.get() |  | ||||||
|             if cancel_scope.cancelled_caught and err: |  | ||||||
|                 raise err |  | ||||||
| 
 |  | ||||||
|     elif inspect.iscoroutine(coro): |  | ||||||
|     # asycn gen |     # asycn gen | ||||||
|  |     if inspect.isasyncgen(coro) or _treat_as_stream: | ||||||
|         async def result(): |         async def result(): | ||||||
|             with cancel_scope: |             with cancel_scope: | ||||||
|                 async with from_aio: |                 async with from_aio: | ||||||
|  | @ -101,6 +98,14 @@ async def run_task( | ||||||
| 
 | 
 | ||||||
|         return result() |         return result() | ||||||
| 
 | 
 | ||||||
|  |     # simple async func | ||||||
|  |     elif inspect.iscoroutine(coro): | ||||||
|  |         with cancel_scope: | ||||||
|  |             result = await from_aio.receive() | ||||||
|  |             return result | ||||||
|  |         if cancel_scope.cancelled_caught and err: | ||||||
|  |             raise err | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| def run_as_asyncio_guest( | def run_as_asyncio_guest( | ||||||
|     trio_main: Awaitable, |     trio_main: Awaitable, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue