From ef053eb070d2d5ece554e66068d59e2471a217be Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 27 Jul 2020 21:05:00 -0300 Subject: [PATCH] Added named arguments to child init, and now passing less of them. --- tractor/_actor.py | 23 +++++++++++-------- tractor/_child.py | 57 ++++++++++++++++++++++------------------------- tractor/_spawn.py | 44 +++++++++++++++++++++--------------- 3 files changed, 67 insertions(+), 57 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index e4a6296..3869f6e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -171,11 +171,6 @@ class Actor: _root_nursery: trio.Nursery _server_nursery: trio.Nursery - # marked by the process spawning backend at startup - # will be None for the parent most process started manually - # by the user (currently called the "arbiter") - _spawn_method: Optional[str] = None - # Information about `__main__` from parent _parent_main_data: Dict[str, str] @@ -187,6 +182,7 @@ class Actor: uid: str = None, loglevel: str = None, arbiter_addr: Optional[Tuple[str, int]] = None, + spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning phase (aka before a new process is executed). @@ -212,6 +208,11 @@ class Actor: self.loglevel = loglevel self._arb_addr = arbiter_addr + # marked by the process spawning backend at startup + # will be None for the parent most process started manually + # by the user (currently called the "arbiter") + self._spawn_method = spawn_method + self._peers: defaultdict = defaultdict(list) self._peer_connected: dict = {} self._no_more_peers = trio.Event() @@ -552,8 +553,8 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ - arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False + arbiter_addr = arbiter_addr or self._arb_addr try: async with trio.open_nursery() as nursery: self._root_nursery = nursery @@ -578,9 +579,13 @@ class Actor: if self._spawn_method == "trio": # recieve additional init params - self._parent_main_data = await chan.recv() - self.rpc_module_paths = await chan.recv() - self.statespace = await chan.recv() + parent_data = await chan.recv() + for attr, value in parent_data.items(): + setattr(self, attr, value) + + # update local arbiter_addr var + if "_arb_addr" in parent_data: + arbiter_addr = self._arb_addr except OSError: # failed to connect log.warning( diff --git a/tractor/_child.py b/tractor/_child.py index 64c3054..cee8015 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -2,52 +2,49 @@ import sys import trio import argparse +from ast import literal_eval + from ._actor import Actor from ._entry import _trio_main -"""This is the "bootloader" for actors started using the native trio backend -added in #128 +"""This is the "bootloader" for actors started using the native trio backend. """ +def parse_uid(arg): + uid = literal_eval(arg) + assert len(uid) == 2 + assert isinstance(uid[0], str) + assert isinstance(uid[1], str) + return uid + +def parse_ipaddr(arg): + addr = literal_eval(arg) + assert len(addr) == 2 + assert isinstance(addr[0], str) + assert isinstance(addr[1], int) + return addr + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("name") - parser.add_argument("uid") - parser.add_argument("loglevel") - parser.add_argument("bind_addr") - parser.add_argument("parent_addr") - parser.add_argument("arbiter_addr") + parser.add_argument("--uid", type=parse_uid) + parser.add_argument("--loglevel", type=str) + parser.add_argument("--parent_addr", type=parse_ipaddr) args = parser.parse_args() - bind_addr = args.bind_addr.split(":") - bind_addr = (bind_addr[0], int(bind_addr[1])) - - parent_addr = args.parent_addr.split(":") - parent_addr = (parent_addr[0], int(parent_addr[1])) - - arbiter_addr = args.arbiter_addr.split(":") - arbiter_addr = (arbiter_addr[0], int(arbiter_addr[1])) - - if args.loglevel == "None": - loglevel = None - else: - loglevel = args.loglevel - subactor = Actor( - args.name, - uid=args.uid, - loglevel=loglevel, - arbiter_addr=arbiter_addr - ) - subactor._spawn_method = "trio" + args.uid[0], + uid=args.uid[1], + loglevel=args.loglevel, + spawn_method="trio" + ) _trio_main( subactor, - bind_addr, - parent_addr=parent_addr + ("127.0.0.1", 0), + parent_addr=args.parent_addr ) \ No newline at end of file diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 49baed6..a85f442 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -160,23 +160,28 @@ async def cancel_on_completion( async def spawn_subactor( subactor: 'Actor', accept_addr: Tuple[str, int], - parent_addr: Tuple[str, int] = None + parent_addr: Optional[Tuple[str, int]] = None ): - async with await trio.open_process( - [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - subactor.name, - subactor.uid[1], - subactor.loglevel or "None", - f"{accept_addr[0]}:{accept_addr[1]}", - f"{parent_addr[0]}:{parent_addr[1]}", - f"{subactor._arb_addr[0]}:{subactor._arb_addr[1]}" + + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + "--uid", + str(subactor.uid), + "--parent_addr", + str(parent_addr) + ] + + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel ] - ) as proc: + + async with await trio.open_process(spawn_cmd) as proc: yield proc @@ -218,9 +223,12 @@ async def new_proc( subactor, proc, portal) # send additional init params - await chan.send(subactor._parent_main_data) - await chan.send(subactor.rpc_module_paths) - await chan.send(subactor.statespace) + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "rpc_module_paths": subactor.rpc_module_paths, + "statespace": subactor.statespace, + "_arb_addr": subactor._arb_addr + }) task_status.started(portal)