Organize process spawning into lookup table

Instead of the logic branching create a table `._spawn._methods`
which is used to lookup the desired backend framework (in this case
still only one of `multiprocessing` or `trio`) and make the top level
`.new_proc()` do the lookup and any common logic. Use a `typing.Literal`
to define the lookup table's key set.

Repair and ignore a bunch of type-annot related stuff todo with `mypy`
updates and backend-specific process typing.
spawn_backend_table
Tyler Goodlet 2022-10-09 16:05:40 -04:00
parent 15047341bd
commit 90f4912580
4 changed files with 216 additions and 166 deletions

View File

@ -18,15 +18,28 @@
Sub-process entry points. Sub-process entry points.
""" """
from __future__ import annotations
from functools import partial from functools import partial
from typing import Any from typing import (
Any,
TYPE_CHECKING,
)
import trio # type: ignore import trio # type: ignore
from .log import get_console_log, get_logger from .log import (
get_console_log,
get_logger,
)
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest from .to_asyncio import run_as_asyncio_guest
from ._runtime import async_main, Actor from ._runtime import (
async_main,
Actor,
)
if TYPE_CHECKING:
from ._spawn import SpawnMethodKey
log = get_logger(__name__) log = get_logger(__name__)
@ -37,7 +50,7 @@ def _mp_main(
actor: 'Actor', # type: ignore actor: 'Actor', # type: ignore
accept_addr: tuple[str, int], accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any], forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: str, start_method: SpawnMethodKey,
parent_addr: tuple[str, int] = None, parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,

View File

@ -62,7 +62,7 @@ async def open_root_actor(
# either the `multiprocessing` start method: # either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default). # OR `trio` (the new default).
start_method: Optional[str] = None, start_method: Optional[_spawn.SpawnMethodKey] = None,
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,

View File

@ -23,6 +23,7 @@ import sys
import platform import platform
from typing import ( from typing import (
Any, Any,
Literal,
Optional, Optional,
Callable, Callable,
TypeVar, TypeVar,
@ -60,7 +61,12 @@ log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: Optional[mp.context.BaseContext] = None
_spawn_method: str = "trio" SpawnMethodKey = Literal[
'trio',
'spawn',
'forkserver',
]
_spawn_method: SpawnMethodKey = 'trio'
if platform.system() == 'Windows': if platform.system() == 'Windows':
@ -77,7 +83,10 @@ else:
await trio.lowlevel.wait_readable(proc.sentinel) await trio.lowlevel.wait_readable(proc.sentinel)
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: def try_set_start_method(
name: SpawnMethodKey
) -> Optional[mp.context.BaseContext]:
''' '''
Attempt to set the method for process starting, aka the "actor Attempt to set the method for process starting, aka the "actor
spawning backend". spawning backend".
@ -108,6 +117,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
from . import _forkserver_override from . import _forkserver_override
_forkserver_override.override_stdlib() _forkserver_override.override_stdlib()
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
elif name == 'trio': elif name == 'trio':
_ctx = None _ctx = None
else: else:
@ -252,6 +262,43 @@ async def soft_wait(
async def new_proc( async def new_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
# lookup backend spawning target
target = _methods[_spawn_method]
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
await target(
name,
actor_nursery,
subactor,
errors,
bind_addr,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
task_status=task_status,
)
async def trio_proc(
name: str, name: str,
actor_nursery: ActorNursery, actor_nursery: ActorNursery,
@ -277,179 +324,157 @@ async def new_proc(
here is to be considered the core supervision strategy. here is to be considered the core supervision strategy.
''' '''
# mark the new actor with the global spawn method spawn_cmd = [
subactor._spawn_method = _spawn_method sys.executable,
uid = subactor.uid "-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
]
if _spawn_method == 'trio': if subactor.loglevel:
spawn_cmd = [ spawn_cmd += [
sys.executable, "--loglevel",
"-m", subactor.loglevel
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
] ]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
if subactor.loglevel: cancelled_during_spawn: bool = False
spawn_cmd += [ proc: Optional[trio.Process] = None
"--loglevel", try:
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
try: try:
try: # TODO: needs ``trio_typing`` patch?
# TODO: needs ``trio_typing`` patch? proc = await trio.lowlevel.open_process( # type: ignore
proc = await trio.lowlevel.open_process( # type: ignore spawn_cmd)
spawn_cmd)
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
# we may cancel before the child connects back in which # we may cancel before the child connects back in which
# case avoid clobbering the pdb tty. # case avoid clobbering the pdb tty.
if debug_mode(): if debug_mode():
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await maybe_wait_for_debugger()
elif proc is not None:
async with acquire_debug_lock(uid):
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
raise
# a sub-proc ref **must** exist now
assert proc
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor,
proc,
portal,
)
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
})
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {uid}')
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if is_root_process():
# Try again to avoid TTY clobbering. await maybe_wait_for_debugger()
async with acquire_debug_lock(uid):
elif proc is not None:
async with acquire_debug_lock(subactor.uid):
# soft wait on the proc to terminate
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
raise
if is_root_process(): # a sub-proc ref **must** exist now
await maybe_wait_for_debugger( assert proc
child_in_debug=_runtime_vars.get(
'_debug_mode', False),
)
if proc.poll() is None: portal = Portal(chan)
log.cancel(f"Attempting to hard kill {proc}") actor_nursery._children[subactor.uid] = (
await do_hard_kill(proc) subactor,
proc,
log.debug(f"Joined {proc}") portal,
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.uid)
else:
# `multiprocessing`
# async with trio.open_nursery() as nursery:
await mp_new_proc(
name=name,
actor_nursery=actor_nursery,
subactor=subactor,
errors=errors,
# passed through to actor main
bind_addr=bind_addr,
parent_addr=parent_addr,
_runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status,
) )
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
})
async def mp_new_proc( # track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.
async with acquire_debug_lock(subactor.uid):
with trio.move_on_after(0.5):
await proc.wait()
if is_root_process():
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False),
)
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")
await do_hard_kill(proc)
log.debug(f"Joined {proc}")
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.uid)
async def mp_proc(
name: str, name: str,
actor_nursery: ActorNursery, # type: ignore # noqa actor_nursery: ActorNursery, # type: ignore # noqa
@ -608,4 +633,16 @@ async def mp_new_proc(
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor # pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) actor_nursery._children.pop(subactor.uid)
# TODO: prolly report to ``mypy`` how this causes all sorts of
# false errors..
# subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# proc spawning backend target map
_methods: dict[str, Callable] = {
'trio': trio_proc,
'spawn': mp_proc,
'forkserver': mp_proc,
}

View File

@ -92,7 +92,7 @@ class ActorNursery:
tuple[str, str], tuple[str, str],
tuple[ tuple[
Actor, Actor,
mp.context.Process | trio.Process, trio.Process | mp.Process,
Optional[Portal], Optional[Portal],
] ]
] = {} ] = {}