forked from goodboy/tractor
				
			Support and test infected-`asyncio`-mode for root
Such that you can use,
```python
    tractor.to_asyncio.run_as_asyncio_guest(
        trio_main=_trio_main,
    )
```
to boostrap the root actor (and thus main parent process) to embed
the actor-rumtime into an `asyncio` loop. Prove it all works with an
subactor-free version of the aio echo-server test suite B)
			
			
				remotes/1757153874605917753/main
			
			
		
							parent
							
								
									c2bbb7e259
								
							
						
					
					
						commit
						a283d8c05a
					
				|  | @ -5,6 +5,7 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'. | |||
| import asyncio | ||||
| import builtins | ||||
| from contextlib import ExitStack | ||||
| from functools import partial | ||||
| import itertools | ||||
| import importlib | ||||
| import os | ||||
|  | @ -536,41 +537,40 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr): | |||
|     excinfo.value.boxed_type is Exception | ||||
| 
 | ||||
| 
 | ||||
| async def aio_echo_server( | ||||
|     to_trio: trio.MemorySendChannel, | ||||
|     from_trio: asyncio.Queue, | ||||
| ) -> None: | ||||
| 
 | ||||
|     to_trio.send_nowait('start') | ||||
| 
 | ||||
|     while True: | ||||
|         msg = await from_trio.get() | ||||
| 
 | ||||
|         # echo the msg back | ||||
|         to_trio.send_nowait(msg) | ||||
| 
 | ||||
|         # if we get the terminate sentinel | ||||
|         # break the echo loop | ||||
|         if msg is None: | ||||
|             print('breaking aio echo loop') | ||||
|             break | ||||
| 
 | ||||
|     print('exiting asyncio task') | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def trio_to_aio_echo_server( | ||||
|     ctx: tractor.Context, | ||||
|     ctx: tractor.Context|None, | ||||
| ): | ||||
| 
 | ||||
|     async def aio_echo_server( | ||||
|         to_trio: trio.MemorySendChannel, | ||||
|         from_trio: asyncio.Queue, | ||||
|     ) -> None: | ||||
| 
 | ||||
|         to_trio.send_nowait('start') | ||||
| 
 | ||||
|         while True: | ||||
|             msg = await from_trio.get() | ||||
| 
 | ||||
|             # echo the msg back | ||||
|             to_trio.send_nowait(msg) | ||||
| 
 | ||||
|             # if we get the terminate sentinel | ||||
|             # break the echo loop | ||||
|             if msg is None: | ||||
|                 print('breaking aio echo loop') | ||||
|                 break | ||||
| 
 | ||||
|         print('exiting asyncio task') | ||||
| 
 | ||||
|     async with to_asyncio.open_channel_from( | ||||
|         aio_echo_server, | ||||
|     ) as (first, chan): | ||||
| 
 | ||||
|         assert first == 'start' | ||||
| 
 | ||||
|         await ctx.started(first) | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             async for msg in stream: | ||||
|                 print(f'asyncio echoing {msg}') | ||||
|                 await chan.send(msg) | ||||
|  | @ -649,6 +649,68 @@ def test_echoserver_detailed_mechanics( | |||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'raise_error_mid_stream', | ||||
|     [ | ||||
|         False, | ||||
|         Exception, | ||||
|         KeyboardInterrupt, | ||||
|     ], | ||||
|     ids='raise_error={}'.format, | ||||
| ) | ||||
| def test_infected_root_actor( | ||||
|     raise_error_mid_stream: bool|Exception, | ||||
|     # conftest wide | ||||
|     loglevel: str, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True` | ||||
|     in the root actor. | ||||
| 
 | ||||
|     ''' | ||||
|     async def _trio_main(): | ||||
| 
 | ||||
|         first: str | ||||
|         chan: to_asyncio.LinkedTaskChannel | ||||
|         async with ( | ||||
|             tractor.open_root_actor( | ||||
|                 debug_mode=debug_mode, | ||||
|                 loglevel=loglevel, | ||||
|             ), | ||||
|             to_asyncio.open_channel_from( | ||||
|                 aio_echo_server, | ||||
|             ) as (first, chan), | ||||
|         ): | ||||
|             assert first == 'start' | ||||
| 
 | ||||
|             for i in range(1000): | ||||
|                 await chan.send(i) | ||||
|                 out = await chan.receive() | ||||
|                 assert out == i | ||||
|                 print(f'asyncio echoing {i}') | ||||
| 
 | ||||
|                 if raise_error_mid_stream and i == 500: | ||||
|                     raise raise_error_mid_stream | ||||
| 
 | ||||
|                 if out is None: | ||||
|                     try: | ||||
|                         out = await chan.receive() | ||||
|                     except trio.EndOfChannel: | ||||
|                         break | ||||
|                     else: | ||||
|                         raise RuntimeError('aio channel never stopped?') | ||||
| 
 | ||||
|     if raise_error_mid_stream: | ||||
|         with pytest.raises(raise_error_mid_stream): | ||||
|             tractor.to_asyncio.run_as_asyncio_guest( | ||||
|                 trio_main=_trio_main, | ||||
|             ) | ||||
|     else: | ||||
|         tractor.to_asyncio.run_as_asyncio_guest( | ||||
|             trio_main=_trio_main, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def manage_file( | ||||
|  |  | |||
|  | @ -334,6 +334,10 @@ async def open_root_actor( | |||
|             loglevel=loglevel, | ||||
|             enable_modules=enable_modules, | ||||
|         ) | ||||
|         # XXX, in case the root actor runtime was actually run from | ||||
|         # `tractor.to_asyncio.run_as_asyncio_guest()` and NOt | ||||
|         # `.trio.run()`. | ||||
|         actor._infected_aio = _state._runtime_vars['_is_infected_aio'] | ||||
| 
 | ||||
|     # Start up main task set via core actor-runtime nurseries. | ||||
|     try: | ||||
|  |  | |||
|  | @ -36,6 +36,7 @@ import tractor | |||
| from tractor._exceptions import AsyncioCancelled | ||||
| from tractor._state import ( | ||||
|     debug_mode, | ||||
|     _runtime_vars, | ||||
| ) | ||||
| from tractor.devx import _debug | ||||
| from tractor.log import get_logger | ||||
|  | @ -767,12 +768,16 @@ def run_as_asyncio_guest( | |||
|             'Infecting `asyncio`-process with a `trio` guest-run!\n' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO, somehow bootstrap this! | ||||
|         _runtime_vars['_is_infected_aio'] = True | ||||
| 
 | ||||
|         trio.lowlevel.start_guest_run( | ||||
|             trio_main, | ||||
|             run_sync_soon_threadsafe=loop.call_soon_threadsafe, | ||||
|             done_callback=trio_done_callback, | ||||
|         ) | ||||
|         fute_err: BaseException|None = None | ||||
| 
 | ||||
|         try: | ||||
|             out: Outcome = await asyncio.shield(trio_done_fute) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue