Now passing additional initialization parameters through channel early after handshake.

start_up_sequence_trickery
Guillermo Rodriguez 2020-07-27 14:55:37 -03:00
parent 2cc4d7ce04
commit 2a407be532
No known key found for this signature in database
GPG Key ID: 3F61096EC7DF75A8
4 changed files with 87 additions and 24 deletions

View File

@ -575,6 +575,11 @@ class Actor:
await chan.connect() await chan.connect()
# initial handshake, report who we are, who they are # initial handshake, report who we are, who they are
await self._do_handshake(chan) await self._do_handshake(chan)
self._parent_main_data = await chan.recv()
self.rpc_module_paths = await chan.recv()
self.statespace = await chan.recv()
except OSError: # failed to connect except OSError: # failed to connect
log.warning( log.warning(
f"Failed to connect to parent @ {parent_addr}," f"Failed to connect to parent @ {parent_addr},"

View File

@ -1,6 +1,53 @@
import sys import sys
import trio import trio
import cloudpickle import argparse
from ._actor import Actor
from ._entry import _trio_main
"""This is the "bootloader" for actors started using the native trio backend
added in #128
"""
if __name__ == "__main__": if __name__ == "__main__":
trio.run(cloudpickle.load(sys.stdin.buffer))
parser = argparse.ArgumentParser()
parser.add_argument("name")
parser.add_argument("uid")
parser.add_argument("loglevel")
parser.add_argument("bind_addr")
parser.add_argument("parent_addr")
parser.add_argument("arbiter_addr")
args = parser.parse_args()
bind_addr = args.bind_addr.split(":")
bind_addr = (bind_addr[0], int(bind_addr[1]))
parent_addr = args.parent_addr.split(":")
parent_addr = (parent_addr[0], int(parent_addr[1]))
arbiter_addr = args.arbiter_addr.split(":")
arbiter_addr = (arbiter_addr[0], int(arbiter_addr[1]))
if args.loglevel == "None":
loglevel = None
else:
loglevel = args.loglevel
subactor = Actor(
args.name,
uid=args.uid,
loglevel=loglevel,
arbiter_addr=arbiter_addr
)
subactor._spawn_method = "trio"
_trio_main(
subactor,
bind_addr,
parent_addr=parent_addr
)

View File

@ -51,24 +51,31 @@ def _mp_main(
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")
async def _trio_main( def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int] = None
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
""" """
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
log.info(f"Started new trio process for {actor.uid}") log.info(
f"Started {actor.uid}")
_state._current_actor = actor _state._current_actor = actor
await actor._async_main(accept_addr, parent_addr=parent_addr) log.debug(f"parent_addr is {parent_addr}")
log.info(f"Actor {actor.uid} terminated") trio_main = partial(
actor._async_main,
accept_addr,
parent_addr=parent_addr
)
try:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {actor.uid} terminated")

View File

@ -10,7 +10,6 @@ from typing import Any, Dict, Optional
from functools import partial from functools import partial
import trio import trio
import cloudpickle
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import aclosing, asynccontextmanager from async_generator import aclosing, asynccontextmanager
@ -158,9 +157,11 @@ async def cancel_on_completion(
@asynccontextmanager @asynccontextmanager
async def run_in_process(subactor, async_fn, *args, **kwargs): async def spawn_subactor(
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs)) subactor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
):
async with await trio.open_process( async with await trio.open_process(
[ [
sys.executable, sys.executable,
@ -168,15 +169,14 @@ async def run_in_process(subactor, async_fn, *args, **kwargs):
# Hardcode this (instead of using ``_child.__name__`` to avoid a # Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583 # double import warning: https://stackoverflow.com/a/45070583
"tractor._child", "tractor._child",
# This is merely an identifier for debugging purposes when subactor.name,
# viewing the process tree from the OS subactor.uid[1],
str(subactor.uid), subactor.loglevel or "None",
], f"{accept_addr[0]}:{accept_addr[1]}",
stdin=subprocess.PIPE, f"{parent_addr[0]}:{parent_addr[1]}",
f"{subactor._arb_addr[0]}:{subactor._arb_addr[1]}"
]
) as proc: ) as proc:
# send func object to call in child
await proc.stdin.send_all(encoded_job)
yield proc yield proc
@ -201,9 +201,7 @@ async def new_proc(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio': if use_trio_run_in_process or _spawn_method == 'trio':
async with run_in_process( async with spawn_subactor(
subactor,
_trio_main,
subactor, subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,
@ -218,6 +216,12 @@ async def new_proc(
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)
# send additional init params
await chan.send(subactor._parent_main_data)
await chan.send(subactor.rpc_module_paths)
await chan.send(subactor.statespace)
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called