forked from goodboy/tractor
1
0
Fork 0
tractor/tractor/_spawn.py

345 lines
12 KiB
Python
Raw Normal View History

"""
Machinery for actor process spawning using multiple backends.
"""
import sys
2020-01-20 16:10:51 +00:00
import inspect
import subprocess
import multiprocessing as mp
2020-01-20 16:10:51 +00:00
import platform
from typing import Any, Dict, Optional
from functools import partial
import trio
2020-01-20 16:10:51 +00:00
from trio_typing import TaskStatus
from async_generator import aclosing, asynccontextmanager
2019-11-26 14:23:37 +00:00
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
from multiprocessing import forkserver # type: ignore
from typing import Tuple
2020-07-22 16:50:16 +00:00
from . import _forkserver_override
from ._state import current_actor
2020-01-20 16:10:51 +00:00
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trio_main
2020-01-20 16:10:51 +00:00
2020-01-20 16:10:51 +00:00
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
_spawn_method: str = "spawn"
2020-01-20 16:10:51 +00:00
if platform.system() == 'Windows':
_spawn_method = "spawn"
_ctx = mp.get_context("spawn")
2020-01-20 16:10:51 +00:00
async def proc_waiter(proc: mp.Process) -> None:
await trio.lowlevel.WaitForSingleObject(proc.sentinel)
2020-01-20 16:10:51 +00:00
else:
# *NIX systems use ``trio`` primitives as our default
_spawn_method = "trio"
2020-01-23 06:23:26 +00:00
2020-01-20 16:10:51 +00:00
async def proc_waiter(proc: mp.Process) -> None:
await trio.lowlevel.wait_readable(proc.sentinel)
2020-01-20 16:10:51 +00:00
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
"""Attempt to set the method for process starting, aka the "actor
spawning backend".
If the desired method is not supported this function will error.
On Windows only the ``multiprocessing`` "spawn" method is offered
besides the default ``trio`` which uses async wrapping around
``subprocess.Popen``.
"""
global _ctx
global _spawn_method
methods = mp.get_all_start_methods()
2020-01-27 03:35:42 +00:00
if 'fork' in methods:
# forking is incompatible with ``trio``s global task tree
2020-01-27 03:35:42 +00:00
methods.remove('fork')
# supported on all platforms
methods += ['trio']
if name not in methods:
raise ValueError(
f"Spawn method `{name}` is invalid please choose one of {methods}"
)
elif name == 'forkserver':
2019-04-24 01:55:27 +00:00
_forkserver_override.override_stdlib()
_ctx = mp.get_context(name)
elif name == 'trio':
_ctx = None
else:
_ctx = mp.get_context(name)
2020-01-23 06:32:02 +00:00
_spawn_method = name
return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
2020-01-20 16:10:51 +00:00
async def exhaust_portal(
portal: Portal,
actor: Actor
) -> Any:
"""Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res):
final = []
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
2020-07-22 16:50:16 +00:00
log.debug(f"Returning final result: {final}")
2020-01-20 16:10:51 +00:00
return final
async def cancel_on_completion(
portal: Portal,
actor: Actor,
2020-01-21 02:06:49 +00:00
errors: Dict[Tuple[str, str], Exception],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
2020-01-20 16:10:51 +00:00
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
2020-01-21 02:06:49 +00:00
log.info(
f"Cancelling {portal.channel.uid} gracefully "
"after result {result}")
2020-01-20 16:10:51 +00:00
# cancel the process now that we have a final result
await portal.cancel_actor()
@asynccontextmanager
async def spawn_subactor(
subactor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Optional[Tuple[str, int]] = None
):
spawn_cmd = [
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
"--uid",
str(subactor.uid),
"--parent_addr",
str(parent_addr)
]
if subactor.loglevel:
spawn_cmd += [
"--loglevel",
subactor.loglevel
]
async with await trio.open_process(spawn_cmd) as proc:
yield proc
2019-11-26 14:23:37 +00:00
async def new_proc(
name: str,
2020-01-21 02:06:49 +00:00
actor_nursery: 'ActorNursery', # type: ignore
2020-01-20 16:10:51 +00:00
subactor: Actor,
2020-01-21 02:06:49 +00:00
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
use_trio_run_in_process: bool = False,
2020-01-20 16:10:51 +00:00
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
2020-01-21 02:06:49 +00:00
) -> None:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
2020-01-20 16:10:51 +00:00
cancel_scope = None
2019-11-26 14:23:37 +00:00
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
2020-01-20 16:10:51 +00:00
async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio':
async with spawn_subactor(
subactor,
2019-11-26 14:23:37 +00:00
bind_addr,
2020-01-20 16:10:51 +00:00
parent_addr,
) as proc:
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths,
"statespace": subactor.statespace,
"_arb_addr": subactor._arb_addr
})
2020-01-20 16:10:51 +00:00
task_status.started(portal)
# wait for ActorNursery.wait() to be called
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)
2020-07-22 16:50:16 +00:00
# Wait for proc termination but **dont'** yet call
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).
await proc.wait()
2020-01-20 16:10:51 +00:00
else:
# `multiprocessing`
assert _ctx
2020-01-20 16:10:51 +00:00
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else:
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
fs_info = (None, None, None, None, None)
2020-01-21 02:06:49 +00:00
proc = _ctx.Process( # type: ignore
target=_mp_main,
2020-01-20 16:10:51 +00:00
args=(
subactor,
2020-01-20 16:10:51 +00:00
bind_addr,
fs_info,
start_method,
parent_addr,
2020-01-20 16:10:51 +00:00
),
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.uid] = (subactor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (subactor, proc, portal)
# unblock parent task
task_status.started(portal)
# wait for ``ActorNursery`` block to signal that
# subprocesses can be waited upon.
# This is required to ensure synchronization
# with user code that may want to manually await results
# from nursery spawned sub-actors. We don't want the
# containing nurseries here to collect results or error
# while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor.
2020-01-20 16:10:51 +00:00
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)
# TODO: timeout block here?
if proc.is_alive():
await proc_waiter(proc)
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {subactor.uid}")
cancel_scope.cancel()