forked from goodboy/tractor
1
0
Fork 0

Repair startup sequence around parent state transfer

In order to have reliable subactor startup we need the following
sequence to take place:
- connect to the parent actor, handshake and receive runtime state
- load exposed modules into memory
- start the channel server up fully using the provided bind address
- finally, start processing new messages from the parent

Add a bunch more comments to clarify all this.
start_up_sequence_trickery
Tyler Goodlet 2020-07-28 22:25:22 -04:00
parent 0a5691e0a8
commit 9a40291d4a
2 changed files with 51 additions and 25 deletions

View File

@ -543,6 +543,13 @@ class Actor:
async def _async_main( async def _async_main(
self, self,
accept_addr: Optional[Tuple[str, int]] = None, accept_addr: Optional[Tuple[str, int]] = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
# the child instead of relaying it over the connect-back
# channel). Once that backend is removed we can likely just
# change this so a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: Optional[Tuple[str, int]] = None, parent_addr: Optional[Tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -557,26 +564,34 @@ class Actor:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# TODO: just make `parent_addr` a bool system (see above)?
if parent_addr is not None: if parent_addr is not None:
try: try:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
# handshake (From this point on if we error, ship the # handshake. From this point on if we error, we
# exception back to the parent actor) # attempt to ship the exception back to the parent.
chan = self._parent_chan = Channel( chan = self._parent_chan = Channel(
destaddr=parent_addr, destaddr=parent_addr,
) )
await chan.connect() await chan.connect()
# initial handshake, report who we are, who they are
# Initial handshake: swap names.
await self._do_handshake(chan) await self._do_handshake(chan)
if self._spawn_method == "trio": if self._spawn_method == "trio":
# recieve additional init params # Receive runtime state from our parent
parent_data = await chan.recv() parent_data = await chan.recv()
log.debug(
"Recieved state from parent:\n"
f"{parent_data}"
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
for attr, value in parent_data.items(): for attr, value in parent_data.items():
setattr(self, attr, value) setattr(self, attr, value)
accept_addr = self.bind_host, self.bind_port
except OSError: # failed to connect except OSError: # failed to connect
log.warning( log.warning(
f"Failed to connect to parent @ {parent_addr}," f"Failed to connect to parent @ {parent_addr},"
@ -584,24 +599,36 @@ class Actor:
await self.cancel() await self.cancel()
self._parent_chan = None self._parent_chan = None
raise raise
else:
# handle new connection back to parent
assert self._parent_chan
nursery.start_soon(
self._process_messages, self._parent_chan)
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
# load exposed/allowed RPC modules # load exposed/allowed RPC modules
# XXX: do this **after** establishing connection to parent # XXX: do this **after** establishing a channel to the parent
# so that import errors are properly propagated upwards # but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
self.load_modules() self.load_modules()
# register with the arbiter if we're told its addr # Startup up channel server with,
# - subactor: the bind address sent to us by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
await nursery.start(
partial(
self._serve_forever,
accept_host=host,
accept_port=port
)
)
# Begin handling our new connection back to parent.
# This is done here since we don't want to start
# processing parent requests until our server is
# 100% up and running.
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
# Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
assert isinstance(self._arb_addr, tuple) assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*self._arb_addr) as arb_portal: async with get_arbiter(*self._arb_addr) as arb_portal:
@ -613,7 +640,7 @@ class Actor:
task_status.started() task_status.started()
log.debug("Waiting on root nursery to complete") log.debug("Waiting on root nursery to complete")
# blocks here as expected until the channel server is # Blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err: except Exception as err:
if not registered_with_arbiter: if not registered_with_arbiter:

View File

@ -3,11 +3,9 @@ Machinery for actor process spawning using multiple backends.
""" """
import sys import sys
import inspect import inspect
import subprocess
import multiprocessing as mp import multiprocessing as mp
import platform import platform
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from functools import partial
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -29,7 +27,7 @@ from ._state import current_actor
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trio_main from ._entry import _mp_main
log = get_logger('tractor') log = get_logger('tractor')
@ -160,7 +158,7 @@ async def cancel_on_completion(
async def spawn_subactor( async def spawn_subactor(
subactor: 'Actor', subactor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Optional[Tuple[str, int]] = None parent_addr: Tuple[str, int],
): ):
spawn_cmd = [ spawn_cmd = [
@ -232,6 +230,7 @@ async def new_proc(
"bind_port": bind_addr[1] "bind_port": bind_addr[1]
}) })
# resume caller at next checkpoint now that child is up
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called