From 0a5691e0a8f091165c143d53ba1d24ce3f222bb6 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 28 Jul 2020 11:55:11 -0300 Subject: [PATCH] Removed arbiter_addr local, and bind_addr is now passed through channel, in early child actor init. --- tractor/_actor.py | 31 +++++++++++++------------------ tractor/_child.py | 19 ++++++------------- tractor/_entry.py | 2 -- tractor/_spawn.py | 4 +++- 4 files changed, 22 insertions(+), 34 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3869f6e..2c15706 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -542,8 +542,7 @@ class Actor: async def _async_main( self, - accept_addr: Tuple[str, int], - arbiter_addr: Optional[Tuple[str, int]] = None, + accept_addr: Optional[Tuple[str, int]] = None, parent_addr: Optional[Tuple[str, int]] = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -554,17 +553,10 @@ class Actor: and when cancelled effectively cancels the actor. """ registered_with_arbiter = False - arbiter_addr = arbiter_addr or self._arb_addr try: async with trio.open_nursery() as nursery: self._root_nursery = nursery - # Startup up channel server - host, port = accept_addr - await nursery.start(partial( - self._serve_forever, accept_host=host, accept_port=port) - ) - if parent_addr is not None: try: # Connect back to the parent actor and conduct initial @@ -583,9 +575,7 @@ class Actor: for attr, value in parent_data.items(): setattr(self, attr, value) - # update local arbiter_addr var - if "_arb_addr" in parent_data: - arbiter_addr = self._arb_addr + accept_addr = self.bind_host, self.bind_port except OSError: # failed to connect log.warning( @@ -600,6 +590,12 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) + # Startup up channel server + host, port = accept_addr + await nursery.start(partial( + self._serve_forever, accept_host=host, accept_port=port) + ) + # load exposed/allowed RPC modules # XXX: do this **after** establishing connection to parent # so that import errors are properly propagated upwards @@ -607,8 +603,8 @@ class Actor: # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") - assert isinstance(arbiter_addr, tuple) - async with get_arbiter(*arbiter_addr) as arb_portal: + assert isinstance(self._arb_addr, tuple) + async with get_arbiter(*self._arb_addr) as arb_portal: await arb_portal.run( 'self', 'register_actor', uid=self.uid, sockaddr=self.accept_addr) @@ -626,7 +622,7 @@ class Actor: # once we have that all working with std streams locking? log.exception( f"Actor errored and failed to register with arbiter " - f"@ {arbiter_addr}?") + f"@ {self._arb_addr}?") log.error( "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" @@ -655,7 +651,7 @@ class Actor: finally: if registered_with_arbiter: - await self._do_unreg(arbiter_addr) + await self._do_unreg(self._arb_addr) # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): @@ -906,8 +902,7 @@ async def _start_actor( partial( actor._async_main, accept_addr=(host, port), - parent_addr=None, - arbiter_addr=arbiter_addr, + parent_addr=None ) ) result = await main() diff --git a/tractor/_child.py b/tractor/_child.py index cee8015..431027d 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -12,18 +12,12 @@ from ._entry import _trio_main """ def parse_uid(arg): - uid = literal_eval(arg) - assert len(uid) == 2 - assert isinstance(uid[0], str) - assert isinstance(uid[1], str) - return uid + name, uuid = literal_eval(arg) # ensure 2 elements + return str(name), str(uuid) # ensures str encoding def parse_ipaddr(arg): - addr = literal_eval(arg) - assert len(addr) == 2 - assert isinstance(addr[0], str) - assert isinstance(addr[1], int) - return addr + host, port = literal_eval(arg) + return (str(host), int(port)) if __name__ == "__main__": @@ -40,11 +34,10 @@ if __name__ == "__main__": args.uid[0], uid=args.uid[1], loglevel=args.loglevel, - spawn_method="trio" - ) + spawn_method="trio" + ) _trio_main( subactor, - ("127.0.0.1", 0), parent_addr=args.parent_addr ) \ No newline at end of file diff --git a/tractor/_entry.py b/tractor/_entry.py index 81b28d7..1ccc0b5 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -53,7 +53,6 @@ def _mp_main( def _trio_main( actor: 'Actor', - accept_addr: Tuple[str, int], parent_addr: Tuple[str, int] = None ) -> None: """Entry point for a `trio_run_in_process` subactor. @@ -71,7 +70,6 @@ def _trio_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( actor._async_main, - accept_addr, parent_addr=parent_addr ) try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a85f442..5136aa3 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -227,7 +227,9 @@ async def new_proc( "_parent_main_data": subactor._parent_main_data, "rpc_module_paths": subactor.rpc_module_paths, "statespace": subactor.statespace, - "_arb_addr": subactor._arb_addr + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1] }) task_status.started(portal)