Support asyncio actors with the trio spawner backend
							parent
							
								
									c34d4b54fd
								
							
						
					
					
						commit
						1f3de88422
					
				| 
						 | 
				
			
			@ -19,12 +19,15 @@ def parse_ipaddr(arg):
 | 
			
		|||
    return (str(host), int(port))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from ._entry import _trio_main
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
 | 
			
		||||
    parser = argparse.ArgumentParser()
 | 
			
		||||
    parser.add_argument("--uid", type=parse_uid)
 | 
			
		||||
    parser.add_argument("--loglevel", type=str)
 | 
			
		||||
    parser.add_argument("--parent_addr", type=parse_ipaddr)
 | 
			
		||||
    parser.add_argument("--asyncio", action='store_true')
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    subactor = Actor(
 | 
			
		||||
| 
						 | 
				
			
			@ -36,5 +39,6 @@ if __name__ == "__main__":
 | 
			
		|||
 | 
			
		||||
    _trio_main(
 | 
			
		||||
        subactor,
 | 
			
		||||
        parent_addr=args.parent_addr
 | 
			
		||||
    )
 | 
			
		||||
        parent_addr=args.parent_addr,
 | 
			
		||||
        infect_asyncio=args.asyncio,
 | 
			
		||||
    )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,6 +20,7 @@ def _mp_main(
 | 
			
		|||
    forkserver_info: Tuple[Any, Any, Any, Any, Any],
 | 
			
		||||
    start_method: str,
 | 
			
		||||
    parent_addr: Tuple[str, int] = None,
 | 
			
		||||
    infect_asyncio: bool = False,
 | 
			
		||||
) -> None:
 | 
			
		||||
    """The routine called *after fork* which invokes a fresh ``trio.run``
 | 
			
		||||
    """
 | 
			
		||||
| 
						 | 
				
			
			@ -61,12 +62,15 @@ def _trio_main(
 | 
			
		|||
    actor: 'Actor',  # type: ignore # noqa
 | 
			
		||||
    *,
 | 
			
		||||
    parent_addr: Tuple[str, int] = None,
 | 
			
		||||
    infect_asyncio: bool = False,
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Entry point for a `trio_run_in_process` subactor.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    log.info(f"Started new trio process for {actor.uid}")
 | 
			
		||||
 | 
			
		||||
    log.info(f"Started new trio process for {actor.uid}")
 | 
			
		||||
 | 
			
		||||
    if actor.loglevel is not None:
 | 
			
		||||
        log.info(
 | 
			
		||||
            f"Setting loglevel for {actor.uid} to {actor.loglevel}")
 | 
			
		||||
| 
						 | 
				
			
			@ -84,7 +88,11 @@ def _trio_main(
 | 
			
		|||
    )
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        trio.run(trio_main)
 | 
			
		||||
        if infect_asyncio:
 | 
			
		||||
            actor._infected_aio = True
 | 
			
		||||
            run_as_asyncio_guest(trio_main)
 | 
			
		||||
        else:
 | 
			
		||||
            trio.run(trio_main)
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        log.warning(f"Actor {actor.uid} received KBI")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -159,6 +159,7 @@ async def cancel_on_completion(
 | 
			
		|||
async def spawn_subactor(
 | 
			
		||||
    subactor: 'Actor',
 | 
			
		||||
    parent_addr: Tuple[str, int],
 | 
			
		||||
    infect_asyncio: bool,
 | 
			
		||||
):
 | 
			
		||||
    spawn_cmd = [
 | 
			
		||||
        sys.executable,
 | 
			
		||||
| 
						 | 
				
			
			@ -183,6 +184,10 @@ async def spawn_subactor(
 | 
			
		|||
            subactor.loglevel
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    # Tell child to run in guest mode on top of ``asyncio`` loop
 | 
			
		||||
    if infect_asyncio:
 | 
			
		||||
        spawn_cmd.append("--asyncio")
 | 
			
		||||
 | 
			
		||||
    proc = await trio.open_process(spawn_cmd)
 | 
			
		||||
    try:
 | 
			
		||||
        yield proc
 | 
			
		||||
| 
						 | 
				
			
			@ -240,6 +245,7 @@ async def new_proc(
 | 
			
		|||
    parent_addr: Tuple[str, int],
 | 
			
		||||
    _runtime_vars: Dict[str, Any],  # serialized and sent to _child
 | 
			
		||||
    *,
 | 
			
		||||
    infect_asyncio: bool = False,
 | 
			
		||||
    task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Create a new ``multiprocessing.Process`` using the
 | 
			
		||||
| 
						 | 
				
			
			@ -255,6 +261,7 @@ async def new_proc(
 | 
			
		|||
            async with spawn_subactor(
 | 
			
		||||
                subactor,
 | 
			
		||||
                parent_addr,
 | 
			
		||||
                infect_asyncio=infect_asyncio
 | 
			
		||||
            ) as proc:
 | 
			
		||||
                log.info(f"Started {proc}")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -392,6 +399,7 @@ async def mp_new_proc(
 | 
			
		|||
                fs_info,
 | 
			
		||||
                start_method,
 | 
			
		||||
                parent_addr,
 | 
			
		||||
                infect_asyncio,
 | 
			
		||||
            ),
 | 
			
		||||
            # daemon=True,
 | 
			
		||||
            name=name,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue