Support TRIP for process launching

This took a ton of tinkering and a rework of the actor nursery tear down
logic. The main changes include:

- each subprocess is now spawned from inside a trio task
from one of two containing nurseries created in the body of
`tractor.open_nursery()`: one for `run_in_actor()` processes and one for
`start_actor()` "daemons". This is to address the need for
`trio-run-in_process.open_in_process()` opening a nursery which must
be closed from the same task that opened it. Using this same approach
for `multiprocessing` seems to work well. The nurseries are waited in
order (rip actors then daemon actors) during tear down which allows
for avoiding the recursive re-entry of `ActorNursery.wait()` handled
prior.

- pull out all the nested functions / closures that were in
`ActorNursery.wait()` and move into the `_spawn` module such that
that process shutdown logic takes place in each containing task's
code path. This allows for vastly simplifying `.wait()` to just contain an
event trigger which initiates process waiting / result collection.
Likely `.wait()` should just be removed since it can no longer be used
to synchronously wait on the actor nursery.

- drop `ActorNursery.__aenter__()` / `.__atexit__()` and move this
"supervisor" tear down logic into the closing block of `open_nursery()`.
This not only cleans makes the code more comprehensible it also
makes our nursery implementation look more like the one in `trio`.

Resolves #93
try_trip^2
Tyler Goodlet 2020-01-20 11:10:51 -05:00
parent 91c3716968
commit c074aea030
2 changed files with 301 additions and 306 deletions

View File

@ -3,12 +3,15 @@ Process spawning.
Mostly just wrapping around ``multiprocessing``.
"""
import inspect
import multiprocessing as mp
# from . import log
import platform
from typing import Any, List, Dict
import trio
import trio_run_in_process
from trio_typing import TaskStatus
from async_generator import aclosing
try:
from multiprocessing import semaphore_tracker # type: ignore
@ -23,12 +26,24 @@ from typing import Tuple
from . import _forkserver_override
from ._state import current_actor
from ._actor import Actor
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
log = get_logger('tractor')
_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore
if platform.system() == 'Windows':
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
else:
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
def try_set_start_method(name: str) -> mp.context.BaseContext:
"""Attempt to set the start method for ``multiprocess.Process`` spawning.
@ -60,73 +75,203 @@ def is_main_process() -> bool:
return mp.current_process().name == 'MainProcess'
async def exhaust_portal(
portal: Portal,
actor: Actor
) -> Any:
"""Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res):
final = []
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
return final
async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: List[Exception],
task_status=trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
log.info(f"Cancelling {portal.channel.uid} gracefully")
# cancel the process now that we have a final result
await portal.cancel_actor()
# XXX: lol, this will never get run without a shield above..
# if cs.cancelled_caught:
# log.warning(
# "Result waiter was cancelled, process may have died")
async def new_proc(
name: str,
actor: Actor,
actor_nursery: 'ActorNursery',
subactor: Actor,
errors: Dict[str, Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
nursery: trio.Nursery = None,
begin_wait_phase: trio.Event,
use_trip: bool = True,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> mp.Process:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
if use_trip: # trio_run_in_process
mng = trio_run_in_process.open_in_process(
actor._trip_main,
bind_addr,
parent_addr,
nursery=nursery,
)
# XXX playing with trip logging
# l = log.get_console_log(level='debug', name=None, _root_name='trio-run-in-process')
# import logging
# logger = logging.getLogger("trio-run-in-process")
# logger.setLevel('DEBUG')
proc = await mng.__aenter__()
proc.mng = mng
return proc
else:
# use multiprocessing
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver only once
# and pass its ipc info to downstream children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else:
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
fs_info = (None, None, None, None, None)
cancel_scope = None
return _ctx.Process(
target=actor._mp_main,
args=(
async with trio.open_nursery() as nursery:
if use_trip:
# trio_run_in_process
async with trio_run_in_process.open_in_process(
subactor._trip_main,
bind_addr,
fs_info,
start_method,
parent_addr
),
# daemon=True,
name=name,
)
parent_addr,
) as proc:
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
task_status.started(portal)
# wait for ActorNursery.wait() to be called
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)
# TRIP blocks here until process is complete
else:
# `multiprocessing`
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else:
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
fs_info = (None, None, None, None, None)
proc = _ctx.Process(
target=subactor._mp_main,
args=(
bind_addr,
fs_info,
start_method,
parent_addr
),
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.uid] = (subactor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (subactor, proc, portal)
# unblock parent task
task_status.started(portal)
# wait for ActorNursery.wait() to be called
# this is required to ensure synchronization
# with startup and registration of this actor in
# ActorNursery.run_in_actor()
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)
# TODO: timeout block here?
if proc.is_alive():
await proc_waiter(proc)
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {subactor.uid}")
cancel_scope.cancel()

View File

@ -1,42 +1,37 @@
"""
``trio`` inspired apis and helpers
"""
import inspect
import importlib
import platform
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
import typing
import trio
from async_generator import asynccontextmanager, aclosing
import trio_run_in_process
from async_generator import asynccontextmanager
from ._state import current_actor
from .log import get_logger, get_loglevel
from ._actor import Actor, ActorFailure
from ._actor import Actor # , ActorFailure
from ._portal import Portal
from . import _spawn
if platform.system() == 'Windows':
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
else:
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
log = get_logger('tractor')
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor: Actor, nursery: trio.Nursery) -> None:
def __init__(
self,
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: Dict[str, Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
self._nursery = nursery
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
@ -45,10 +40,8 @@ class ActorNursery:
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
# self._aexitstack = contextlib.AsyncExitStack()
async def __aenter__(self):
return self
self._join_procs = trio.Event()
self.errors = errors
async def start_actor(
self,
@ -57,51 +50,34 @@ class ActorNursery:
statespace: Optional[Dict[str, Any]] = None,
rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
mods = {}
for path in rpc_module_paths or ():
mod = importlib.import_module(path)
mods[path] = mod.__file__
actor = Actor(
subactor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=mods,
rpc_module_paths=rpc_module_paths,
statespace=statespace, # global proc state vars
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
assert parent_addr
proc = await _spawn.new_proc(
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery
return await nursery.start(
_spawn.new_proc,
name,
actor,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
self._nursery,
nursery,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
self._children[actor.uid] = (actor, proc, None)
if not isinstance(proc, trio_run_in_process.process.Process):
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid)
portal = Portal(chan)
self._children[actor.uid] = (actor, proc, portal)
return portal
async def run_in_actor(
self,
@ -127,6 +103,8 @@ class ActorNursery:
bind_addr=bind_addr,
statespace=statespace,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
)
# this marks the actor to be cancelled after its portal result
# is retreived, see ``wait()`` below.
@ -140,153 +118,9 @@ class ActorNursery:
async def wait(self) -> None:
"""Wait for all subactors to complete.
This is probably the most complicated (and confusing, sorry)
function that does all the clever crap to deal with cancellation,
error propagation, and graceful subprocess tear down.
"""
async def exhaust_portal(portal, actor):
"""Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res):
final = []
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
return final
async def cancel_on_completion(
portal: Portal,
actor: Actor,
task_status=trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors.append(result)
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
log.info(f"Cancelling {portal.channel.uid} gracefully")
# cancel the process now that we have a final result
await portal.cancel_actor()
# XXX: lol, this will never get run without a shield above..
# if cs.cancelled_caught:
# log.warning(
# "Result waiter was cancelled, process may have died")
async def wait_for_proc(
proc: mp.Process,
actor: Actor,
portal: Portal,
cancel_scope: Optional[trio._core._run.CancelScope] = None,
) -> None:
# please god don't hang
if not isinstance(proc, trio_run_in_process.process.Process):
# TODO: timeout block here?
if proc.is_alive():
await proc_waiter(proc)
proc.join()
else:
# trio_run_in_process blocking wait
if errors:
multierror = trio.MultiError(errors)
# import pdb; pdb.set_trace()
# try:
# with trio.CancelScope(shield=True):
# await proc.mng.__aexit__(
# type(multierror),
# multierror,
# multierror.__traceback__,
# )
# except BaseException as err:
# import pdb; pdb.set_trace()
# pass
# else:
await proc.mng.__aexit__(None, None, None)
# proc.nursery.cancel_scope.cancel()
log.debug(f"Joined {proc}")
# indicate we are no longer managing this subactor
self._children.pop(actor.uid)
# proc terminated, cancel result waiter that may have
# been spawned in tandem if not done already
if cancel_scope: # and not portal._cancelled:
log.warning(
f"Cancelling existing result waiter task for {actor.uid}")
cancel_scope.cancel()
log.debug(f"Waiting on all subactors to complete")
# since we pop each child subactor on termination,
# iterate a copy
children = self._children.copy()
errors: List[Exception] = []
# wait on run_in_actor() tasks, unblocks when all complete
async with trio.open_nursery() as nursery:
# async with self._nursery as nursery:
for subactor, proc, portal in children.values():
cs = None
# portal from ``run_in_actor()``
if portal in self._cancel_after_result_on_exit:
cs = await nursery.start(
cancel_on_completion, portal, subactor)
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(
wait_for_proc, proc, subactor, portal, cs)
if errors:
multierror = trio.MultiError(errors)
if not self.cancelled:
# bubble up error(s) here and expect to be called again
# once the nursery has been cancelled externally (ex.
# from within __aexit__() if an error is caught around
# ``self.wait()`` then, ``self.cancel()`` is called
# immediately, in the default supervisor strat, after
# which in turn ``self.wait()`` is called again.)
raise trio.MultiError(errors)
# wait on all `start_actor()` subactors to complete
# if errors were captured above and we have not been cancelled
# then these ``start_actor()`` spawned actors will block until
# cancelled externally
children = self._children.copy()
async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values():
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
log.debug(f"All subactors for {self} have terminated")
if errors:
# always raise any error if we're also cancelled
raise trio.MultiError(errors)
self._join_procs.set()
async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel
@ -304,7 +138,7 @@ class ActorNursery:
log.debug(f"Cancelling nursery")
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as n:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
if hard_kill:
do_hard_kill(proc)
@ -331,59 +165,20 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor
assert portal
n.start_soon(portal.cancel_actor)
nursery.start_soon(portal.cancel_actor)
# if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes
if cs.cancelled_caught:
log.error(f"Failed to gracefully cancel {self}, hard killing!")
async with trio.open_nursery() as n:
async with trio.open_nursery():
for subactor, proc, portal in self._children.values():
n.start_soon(do_hard_kill, proc)
nursery.start_soon(do_hard_kill, proc)
# mark ourselves as having (tried to have) cancelled all subactors
self.cancelled = True
await self.wait()
async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete.
"""
# XXX: this is effectively the (for now) lone
# cancellation/supervisor strategy (one-cancels-all)
# which exactly mimicks trio's behaviour
if etype is not None:
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
with trio.CancelScope(shield=True):
if etype in (trio.Cancelled, KeyboardInterrupt):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {etype}, ")
await self.cancel()
except trio.MultiError as merr:
if value not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [value])
raise
else:
log.debug(f"Waiting on subactors {self._children} to complete")
try:
await self.wait()
except (Exception, trio.MultiError) as err:
log.warning(f"Nursery cancelling due to {err}")
if self._children:
with trio.CancelScope(shield=True):
await self.cancel()
raise
log.debug(f"Nursery teardown complete")
@asynccontextmanager
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
@ -395,12 +190,67 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# XXX we need this nursery because TRIP is doing all its stuff with
# XXX we use these nurseries because TRIP is doing all its stuff with
# an `@asynccontextmanager` which has an internal nursery *and* the
# task that opens a nursery must also close it - so we need a path
# in TRIP to make this all kinda work as well. Note I'm basically
# giving up for now - it's probably equivalent amounts of work to
# make TRIP vs. `multiprocessing` work here.
async with trio.open_nursery() as nursery:
async with ActorNursery(actor, nursery) as anursery:
yield anursery
# in TRIP to make this all kinda work as well.
errors: Dict[str, Exception] = {}
async with trio.open_nursery() as da_nursery:
try:
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor, ria_nursery, da_nursery, errors
)
try:
# spawning of actors happens in this scope after
# we yield to the caller.
yield anursery
log.debug(
f"Waiting on subactors {anursery._children}"
"to complete"
)
# anursery.wait()
# except (trio.Cancelled, KeyboardInterrupt) as err:
except (BaseException, Exception) as err:
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
with trio.CancelScope(shield=True):
if err in (trio.Cancelled, KeyboardInterrupt):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {err}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
await anursery.cancel()
except trio.MultiError as merr:
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# last bit before first nursery block end
log.debug(f"Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope
except (Exception, trio.MultiError) as err:
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
if len(errors) > 1:
raise trio.MultiError(errors.values())
else:
raise list(errors.values())[0]
log.debug(f"Nursery teardown complete")