From 90f4912580b186b056597a288c699fb99dba59b9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 16:05:40 -0400 Subject: [PATCH] Organize process spawning into lookup table Instead of the logic branching create a table `._spawn._methods` which is used to lookup the desired backend framework (in this case still only one of `multiprocessing` or `trio`) and make the top level `.new_proc()` do the lookup and any common logic. Use a `typing.Literal` to define the lookup table's key set. Repair and ignore a bunch of type-annot related stuff todo with `mypy` updates and backend-specific process typing. --- tractor/_entry.py | 21 ++- tractor/_root.py | 2 +- tractor/_spawn.py | 357 +++++++++++++++++++++++------------------- tractor/_supervise.py | 2 +- 4 files changed, 216 insertions(+), 166 deletions(-) diff --git a/tractor/_entry.py b/tractor/_entry.py index 931b2e2..35a9abf 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -18,15 +18,28 @@ Sub-process entry points. """ +from __future__ import annotations from functools import partial -from typing import Any +from typing import ( + Any, + TYPE_CHECKING, +) import trio # type: ignore -from .log import get_console_log, get_logger +from .log import ( + get_console_log, + get_logger, +) from . import _state from .to_asyncio import run_as_asyncio_guest -from ._runtime import async_main, Actor +from ._runtime import ( + async_main, + Actor, +) + +if TYPE_CHECKING: + from ._spawn import SpawnMethodKey log = get_logger(__name__) @@ -37,7 +50,7 @@ def _mp_main( actor: 'Actor', # type: ignore accept_addr: tuple[str, int], forkserver_info: tuple[Any, Any, Any, Any, Any], - start_method: str, + start_method: SpawnMethodKey, parent_addr: tuple[str, int] = None, infect_asyncio: bool = False, diff --git a/tractor/_root.py b/tractor/_root.py index 1def614..0e5b2aa 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -62,7 +62,7 @@ async def open_root_actor( # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # OR `trio` (the new default). - start_method: Optional[str] = None, + start_method: Optional[_spawn.SpawnMethodKey] = None, # enables the multi-process debugger support debug_mode: bool = False, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d4fb60b..4defb58 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -23,6 +23,7 @@ import sys import platform from typing import ( Any, + Literal, Optional, Callable, TypeVar, @@ -60,7 +61,12 @@ log = get_logger('tractor') # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None -_spawn_method: str = "trio" +SpawnMethodKey = Literal[ + 'trio', + 'spawn', + 'forkserver', +] +_spawn_method: SpawnMethodKey = 'trio' if platform.system() == 'Windows': @@ -77,7 +83,10 @@ else: await trio.lowlevel.wait_readable(proc.sentinel) -def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: +def try_set_start_method( + name: SpawnMethodKey + +) -> Optional[mp.context.BaseContext]: ''' Attempt to set the method for process starting, aka the "actor spawning backend". @@ -108,6 +117,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: from . import _forkserver_override _forkserver_override.override_stdlib() _ctx = mp.get_context(name) + elif name == 'trio': _ctx = None else: @@ -252,6 +262,43 @@ async def soft_wait( async def new_proc( + name: str, + actor_nursery: ActorNursery, + subactor: Actor, + errors: dict[tuple[str, str], Exception], + + # passed through to actor main + bind_addr: tuple[str, int], + parent_addr: tuple[str, int], + _runtime_vars: dict[str, Any], # serialized and sent to _child + + *, + + infect_asyncio: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + +) -> None: + + # lookup backend spawning target + target = _methods[_spawn_method] + + # mark the new actor with the global spawn method + subactor._spawn_method = _spawn_method + + await target( + name, + actor_nursery, + subactor, + errors, + bind_addr, + parent_addr, + _runtime_vars, # run time vars + infect_asyncio=infect_asyncio, + task_status=task_status, + ) + + +async def trio_proc( name: str, actor_nursery: ActorNursery, @@ -277,179 +324,157 @@ async def new_proc( here is to be considered the core supervision strategy. ''' - # mark the new actor with the global spawn method - subactor._spawn_method = _spawn_method - uid = subactor.uid + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + str(subactor.uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] - if _spawn_method == 'trio': - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - str(subactor.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] - # Tell child to run in guest mode on top of ``asyncio`` loop - if infect_asyncio: - spawn_cmd.append("--asyncio") - - cancelled_during_spawn: bool = False - proc: Optional[trio.Process] = None + cancelled_during_spawn: bool = False + proc: Optional[trio.Process] = None + try: try: - try: - # TODO: needs ``trio_typing`` patch? - proc = await trio.lowlevel.open_process( # type: ignore - spawn_cmd) + # TODO: needs ``trio_typing`` patch? + proc = await trio.lowlevel.open_process( # type: ignore + spawn_cmd) - log.runtime(f"Started {proc}") + log.runtime(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) - except trio.Cancelled: - cancelled_during_spawn = True - # we may cancel before the child connects back in which - # case avoid clobbering the pdb tty. - if debug_mode(): - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if is_root_process(): - await maybe_wait_for_debugger() - - elif proc is not None: - async with acquire_debug_lock(uid): - # soft wait on the proc to terminate - with trio.move_on_after(0.5): - await proc.wait() - raise - - # a sub-proc ref **must** exist now - assert proc - - portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( - subactor, - proc, - portal, - ) - - # send additional init params - await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, - }) - - # track subactor in current nursery - curr_actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery - - # resume caller at next checkpoint now that child is up - task_status.started(portal) - - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() - - async with trio.open_nursery() as nursery: - if portal in actor_nursery._cancel_after_result_on_exit: - nursery.start_soon( - cancel_on_completion, - portal, - subactor, - errors - ) - - # This is a "soft" (cancellable) join/reap which - # will remote cancel the actor on a ``trio.Cancelled`` - # condition. - await soft_wait( - proc, - trio.Process.wait, - portal - ) - - # cancel result waiter that may have been spawned in - # tandem if not done already - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - nursery.cancel_scope.cancel() - - finally: - # The "hard" reap since no actor zombies are allowed! - # XXX: do this **after** cancellation/tearfown to avoid - # killing the process too early. - if proc: - log.cancel(f'Hard reap sequence starting for {uid}') + except trio.Cancelled: + cancelled_during_spawn = True + # we may cancel before the child connects back in which + # case avoid clobbering the pdb tty. + if debug_mode(): with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if cancelled_during_spawn: - # Try again to avoid TTY clobbering. - async with acquire_debug_lock(uid): + if is_root_process(): + await maybe_wait_for_debugger() + + elif proc is not None: + async with acquire_debug_lock(subactor.uid): + # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() + raise - if is_root_process(): - await maybe_wait_for_debugger( - child_in_debug=_runtime_vars.get( - '_debug_mode', False), - ) + # a sub-proc ref **must** exist now + assert proc - if proc.poll() is None: - log.cancel(f"Attempting to hard kill {proc}") - await do_hard_kill(proc) - - log.debug(f"Joined {proc}") - else: - log.warning('Nursery cancelled before sub-proc started') - - if not cancelled_during_spawn: - # pop child entry to indicate we no longer managing this - # subactor - actor_nursery._children.pop(subactor.uid) - - else: - # `multiprocessing` - # async with trio.open_nursery() as nursery: - await mp_new_proc( - name=name, - actor_nursery=actor_nursery, - subactor=subactor, - errors=errors, - - # passed through to actor main - bind_addr=bind_addr, - parent_addr=parent_addr, - _runtime_vars=_runtime_vars, - infect_asyncio=infect_asyncio, - task_status=task_status, + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, + proc, + portal, ) + # send additional init params + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "enable_modules": subactor.enable_modules, + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, + }) -async def mp_new_proc( + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + + # resume caller at next checkpoint now that child is up + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: + if portal in actor_nursery._cancel_after_result_on_exit: + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) + + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + trio.Process.wait, + portal + ) + + # cancel result waiter that may have been spawned in + # tandem if not done already + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + nursery.cancel_scope.cancel() + + finally: + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + if proc: + log.cancel(f'Hard reap sequence starting for {subactor.uid}') + with trio.CancelScope(shield=True): + + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(subactor.uid): + with trio.move_on_after(0.5): + await proc.wait() + + if is_root_process(): + await maybe_wait_for_debugger( + child_in_debug=_runtime_vars.get( + '_debug_mode', False), + ) + + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") + await do_hard_kill(proc) + + log.debug(f"Joined {proc}") + else: + log.warning('Nursery cancelled before sub-proc started') + + if not cancelled_during_spawn: + # pop child entry to indicate we no longer managing this + # subactor + actor_nursery._children.pop(subactor.uid) + + +async def mp_proc( name: str, actor_nursery: ActorNursery, # type: ignore # noqa @@ -608,4 +633,16 @@ async def mp_new_proc( log.debug(f"Joined {proc}") # pop child entry to indicate we are no longer managing subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + actor_nursery._children.pop(subactor.uid) + + # TODO: prolly report to ``mypy`` how this causes all sorts of + # false errors.. + # subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + + +# proc spawning backend target map +_methods: dict[str, Callable] = { + 'trio': trio_proc, + 'spawn': mp_proc, + 'forkserver': mp_proc, +} diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 19c7100..4708e1e 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -92,7 +92,7 @@ class ActorNursery: tuple[str, str], tuple[ Actor, - mp.context.Process | trio.Process, + trio.Process | mp.Process, Optional[Portal], ] ] = {}