diff --git a/tractor/_actor.py b/tractor/_actor.py index 2c15706..6b6cdef 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -543,6 +543,13 @@ class Actor: async def _async_main( self, accept_addr: Optional[Tuple[str, int]] = None, + # XXX: currently ``parent_addr`` is only needed for the + # ``multiprocessing`` backend (which pickles state sent to + # the child instead of relaying it over the connect-back + # channel). Once that backend is removed we can likely just + # change this so a simple ``is_subactor: bool`` which will + # be False when running as root actor and True when as + # a subactor. parent_addr: Optional[Tuple[str, int]] = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -557,26 +564,34 @@ class Actor: async with trio.open_nursery() as nursery: self._root_nursery = nursery + # TODO: just make `parent_addr` a bool system (see above)? if parent_addr is not None: try: # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error, ship the - # exception back to the parent actor) + # handshake. From this point on if we error, we + # attempt to ship the exception back to the parent. chan = self._parent_chan = Channel( destaddr=parent_addr, ) await chan.connect() - # initial handshake, report who we are, who they are + + # Initial handshake: swap names. await self._do_handshake(chan) if self._spawn_method == "trio": - # recieve additional init params + # Receive runtime state from our parent parent_data = await chan.recv() + log.debug( + "Recieved state from parent:\n" + f"{parent_data}" + ) + accept_addr = ( + parent_data.pop('bind_host'), + parent_data.pop('bind_port'), + ) for attr, value in parent_data.items(): setattr(self, attr, value) - accept_addr = self.bind_host, self.bind_port - except OSError: # failed to connect log.warning( f"Failed to connect to parent @ {parent_addr}," @@ -584,24 +599,36 @@ class Actor: await self.cancel() self._parent_chan = None raise - else: - # handle new connection back to parent - assert self._parent_chan - nursery.start_soon( - self._process_messages, self._parent_chan) - - # Startup up channel server - host, port = accept_addr - await nursery.start(partial( - self._serve_forever, accept_host=host, accept_port=port) - ) # load exposed/allowed RPC modules - # XXX: do this **after** establishing connection to parent - # so that import errors are properly propagated upwards + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards self.load_modules() - # register with the arbiter if we're told its addr + # Startup up channel server with, + # - subactor: the bind address sent to us by our parent + # over our established channel + # - root actor: the ``accept_addr`` passed to this method + assert accept_addr + host, port = accept_addr + await nursery.start( + partial( + self._serve_forever, + accept_host=host, + accept_port=port + ) + ) + + # Begin handling our new connection back to parent. + # This is done here since we don't want to start + # processing parent requests until our server is + # 100% up and running. + if self._parent_chan: + nursery.start_soon( + self._process_messages, self._parent_chan) + + # Register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") assert isinstance(self._arb_addr, tuple) async with get_arbiter(*self._arb_addr) as arb_portal: @@ -613,7 +640,7 @@ class Actor: task_status.started() log.debug("Waiting on root nursery to complete") - # blocks here as expected until the channel server is + # Blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: if not registered_with_arbiter: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5136aa3..baed198 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -3,11 +3,9 @@ Machinery for actor process spawning using multiple backends. """ import sys import inspect -import subprocess import multiprocessing as mp import platform from typing import Any, Dict, Optional -from functools import partial import trio from trio_typing import TaskStatus @@ -29,7 +27,7 @@ from ._state import current_actor from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure -from ._entry import _mp_main, _trio_main +from ._entry import _mp_main log = get_logger('tractor') @@ -160,7 +158,7 @@ async def cancel_on_completion( async def spawn_subactor( subactor: 'Actor', accept_addr: Tuple[str, int], - parent_addr: Optional[Tuple[str, int]] = None + parent_addr: Tuple[str, int], ): spawn_cmd = [ @@ -232,6 +230,7 @@ async def new_proc( "bind_port": bind_addr[1] }) + # resume caller at next checkpoint now that child is up task_status.started(portal) # wait for ActorNursery.wait() to be called