forked from goodboy/tractor
				
			Support asyncio actors with the trio spawner backend
							parent
							
								
									963dd54573
								
							
						
					
					
						commit
						bf0d758662
					
				|  | @ -19,12 +19,15 @@ def parse_ipaddr(arg): | ||||||
|     return (str(host), int(port)) |     return (str(host), int(port)) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | from ._entry import _trio_main | ||||||
|  | 
 | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
| 
 | 
 | ||||||
|     parser = argparse.ArgumentParser() |     parser = argparse.ArgumentParser() | ||||||
|     parser.add_argument("--uid", type=parse_uid) |     parser.add_argument("--uid", type=parse_uid) | ||||||
|     parser.add_argument("--loglevel", type=str) |     parser.add_argument("--loglevel", type=str) | ||||||
|     parser.add_argument("--parent_addr", type=parse_ipaddr) |     parser.add_argument("--parent_addr", type=parse_ipaddr) | ||||||
|  |     parser.add_argument("--asyncio", action='store_true') | ||||||
|     args = parser.parse_args() |     args = parser.parse_args() | ||||||
| 
 | 
 | ||||||
|     subactor = Actor( |     subactor = Actor( | ||||||
|  | @ -36,5 +39,6 @@ if __name__ == "__main__": | ||||||
| 
 | 
 | ||||||
|     _trio_main( |     _trio_main( | ||||||
|         subactor, |         subactor, | ||||||
|         parent_addr=args.parent_addr |         parent_addr=args.parent_addr, | ||||||
|     ) |         infect_asyncio=args.asyncio, | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  | @ -20,6 +20,7 @@ def _mp_main( | ||||||
|     forkserver_info: Tuple[Any, Any, Any, Any, Any], |     forkserver_info: Tuple[Any, Any, Any, Any, Any], | ||||||
|     start_method: str, |     start_method: str, | ||||||
|     parent_addr: Tuple[str, int] = None, |     parent_addr: Tuple[str, int] = None, | ||||||
|  |     infect_asyncio: bool = False, | ||||||
| ) -> None: | ) -> None: | ||||||
|     """The routine called *after fork* which invokes a fresh ``trio.run`` |     """The routine called *after fork* which invokes a fresh ``trio.run`` | ||||||
|     """ |     """ | ||||||
|  | @ -61,12 +62,15 @@ def _trio_main( | ||||||
|     actor: 'Actor',  # type: ignore # noqa |     actor: 'Actor',  # type: ignore # noqa | ||||||
|     *, |     *, | ||||||
|     parent_addr: Tuple[str, int] = None, |     parent_addr: Tuple[str, int] = None, | ||||||
|  |     infect_asyncio: bool = False, | ||||||
| ) -> None: | ) -> None: | ||||||
|     """Entry point for a `trio_run_in_process` subactor. |     """Entry point for a `trio_run_in_process` subactor. | ||||||
| 
 | 
 | ||||||
|     """ |     """ | ||||||
|     log.info(f"Started new trio process for {actor.uid}") |     log.info(f"Started new trio process for {actor.uid}") | ||||||
| 
 | 
 | ||||||
|  |     log.info(f"Started new trio process for {actor.uid}") | ||||||
|  | 
 | ||||||
|     if actor.loglevel is not None: |     if actor.loglevel is not None: | ||||||
|         log.info( |         log.info( | ||||||
|             f"Setting loglevel for {actor.uid} to {actor.loglevel}") |             f"Setting loglevel for {actor.uid} to {actor.loglevel}") | ||||||
|  | @ -84,7 +88,11 @@ def _trio_main( | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|         trio.run(trio_main) |         if infect_asyncio: | ||||||
|  |             actor._infected_aio = True | ||||||
|  |             run_as_asyncio_guest(trio_main) | ||||||
|  |         else: | ||||||
|  |             trio.run(trio_main) | ||||||
|     except KeyboardInterrupt: |     except KeyboardInterrupt: | ||||||
|         log.warning(f"Actor {actor.uid} received KBI") |         log.warning(f"Actor {actor.uid} received KBI") | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -181,6 +181,7 @@ async def do_hard_kill( | ||||||
| async def spawn_subactor( | async def spawn_subactor( | ||||||
|     subactor: 'Actor', |     subactor: 'Actor', | ||||||
|     parent_addr: Tuple[str, int], |     parent_addr: Tuple[str, int], | ||||||
|  |     infect_asyncio: bool, | ||||||
| ): | ): | ||||||
|     spawn_cmd = [ |     spawn_cmd = [ | ||||||
|         sys.executable, |         sys.executable, | ||||||
|  | @ -205,6 +206,10 @@ async def spawn_subactor( | ||||||
|             subactor.loglevel |             subactor.loglevel | ||||||
|         ] |         ] | ||||||
| 
 | 
 | ||||||
|  |     # Tell child to run in guest mode on top of ``asyncio`` loop | ||||||
|  |     if infect_asyncio: | ||||||
|  |         spawn_cmd.append("--asyncio") | ||||||
|  | 
 | ||||||
|     proc = await trio.open_process(spawn_cmd) |     proc = await trio.open_process(spawn_cmd) | ||||||
|     try: |     try: | ||||||
|         yield proc |         yield proc | ||||||
|  | @ -262,6 +267,7 @@ async def new_proc( | ||||||
|     parent_addr: Tuple[str, int], |     parent_addr: Tuple[str, int], | ||||||
|     _runtime_vars: Dict[str, Any],  # serialized and sent to _child |     _runtime_vars: Dict[str, Any],  # serialized and sent to _child | ||||||
|     *, |     *, | ||||||
|  |     infect_asyncio: bool = False, | ||||||
|     task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED |     task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED | ||||||
| ) -> None: | ) -> None: | ||||||
|     """Create a new ``multiprocessing.Process`` using the |     """Create a new ``multiprocessing.Process`` using the | ||||||
|  | @ -277,6 +283,7 @@ async def new_proc( | ||||||
|             async with spawn_subactor( |             async with spawn_subactor( | ||||||
|                 subactor, |                 subactor, | ||||||
|                 parent_addr, |                 parent_addr, | ||||||
|  |                 infect_asyncio=infect_asyncio | ||||||
|             ) as proc: |             ) as proc: | ||||||
|                 log.runtime(f"Started {proc}") |                 log.runtime(f"Started {proc}") | ||||||
| 
 | 
 | ||||||
|  | @ -419,6 +426,7 @@ async def mp_new_proc( | ||||||
|                 fs_info, |                 fs_info, | ||||||
|                 start_method, |                 start_method, | ||||||
|                 parent_addr, |                 parent_addr, | ||||||
|  |                 infect_asyncio, | ||||||
|             ), |             ), | ||||||
|             # daemon=True, |             # daemon=True, | ||||||
|             name=name, |             name=name, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue