forked from goodboy/tractor
1
0
Fork 0
tractor/tractor/_supervise.py

511 lines
19 KiB
Python
Raw Normal View History

Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2018-07-14 20:09:05 +00:00
"""
``trio`` inspired apis and helpers
Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
2018-07-14 20:09:05 +00:00
"""
from functools import partial
import inspect
from typing import (
Optional,
TYPE_CHECKING,
)
import typing
2021-01-05 13:28:06 +00:00
import warnings
2018-07-14 20:09:05 +00:00
import trio
2020-01-20 16:10:51 +00:00
from async_generator import asynccontextmanager
2018-07-14 20:09:05 +00:00
from ._debug import maybe_wait_for_debugger
2021-03-11 15:07:39 +00:00
from ._state import current_actor, is_main_process
2018-07-14 20:09:05 +00:00
from .log import get_logger, get_loglevel
2022-08-03 18:46:53 +00:00
from ._runtime import Actor
2018-07-14 20:09:05 +00:00
from ._portal import Portal
from ._exceptions import is_multi_cancelled
from ._root import open_root_actor
from . import _state
from . import _spawn
2018-07-14 20:09:05 +00:00
if TYPE_CHECKING:
import multiprocessing as mp
log = get_logger(__name__)
2018-07-14 20:09:05 +00:00
_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0)
2020-08-09 00:58:04 +00:00
2018-07-14 20:09:05 +00:00
class ActorNursery:
2021-11-07 21:45:00 +00:00
'''
The fundamental actor supervision construct: spawn and manage
explicit lifetime and capability restricted, bootstrapped,
``trio.run()`` scheduled sub-processes.
Though the concept of a "process nursery" is different in complexity
and slightly different in semantics then a tradtional single
threaded task nursery, much of the interface is the same. New
processes each require a top level "parent" or "root" task which is
itself no different then any task started by a tradtional
``trio.Nursery``. The main difference is that each "actor" (a
process + ``trio.run()``) contains a full, paralell executing
``trio``-task-tree. The following super powers ensue:
- starting tasks in a child actor are completely independent of
tasks started in the current process. They execute in *parallel*
relative to tasks in the current process and are scheduled by their
own actor's ``trio`` run loop.
- tasks scheduled in a remote process still maintain an SC protocol
across memory boundaries using a so called "structured concurrency
dialogue protocol" which ensures task-hierarchy-lifetimes are linked.
- remote tasks (in another actor) can fail and relay failure back to
the caller task (in some other actor) via a seralized
``RemoteActorError`` which means no zombie process or RPC
initiated task can ever go off on its own.
'''
2020-01-20 16:10:51 +00:00
def __init__(
self,
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], Exception],
2020-01-20 16:10:51 +00:00
) -> None:
# self.supervisor = supervisor # TODO
2018-08-31 21:16:24 +00:00
self._actor: Actor = actor
2020-01-20 16:10:51 +00:00
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: dict[
tuple[str, str],
tuple[
Actor,
trio.Process | mp.Process,
Optional[Portal],
]
2018-08-31 21:16:24 +00:00
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
2018-08-31 21:16:24 +00:00
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
2020-01-20 16:10:51 +00:00
self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False
2020-01-20 16:10:51 +00:00
self.errors = errors
self.exited = trio.Event()
2018-07-14 20:09:05 +00:00
async def start_actor(
self,
name: str,
*,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
2020-01-20 16:10:51 +00:00
nursery: trio.Nursery = None,
2021-03-11 15:07:39 +00:00
debug_mode: Optional[bool] = None,
infect_asyncio: bool = False,
) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
"main task" besides the runtime.
'''
2018-07-14 20:09:05 +00:00
loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
2021-03-11 15:07:39 +00:00
# allow setting debug policy per actor
if debug_mode is not None:
_rtv['_debug_mode'] = debug_mode
self._at_least_one_child_in_debug = True
2021-03-11 15:07:39 +00:00
2021-01-05 13:28:06 +00:00
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
2020-01-20 16:10:51 +00:00
subactor = Actor(
2018-07-14 20:09:05 +00:00
name,
# modules allowed to invoked funcs from
2021-01-05 13:28:06 +00:00
enable_modules=enable_modules,
2018-07-14 20:09:05 +00:00
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
2019-11-26 14:23:37 +00:00
assert parent_addr
2020-01-20 16:10:51 +00:00
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery
2020-01-21 02:06:49 +00:00
# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
partial(
_spawn.new_proc,
name,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
)
2018-07-14 20:09:05 +00:00
)
2018-08-01 19:15:18 +00:00
async def run_in_actor(
self,
2021-11-07 21:45:00 +00:00
fn: typing.Callable,
*,
2021-11-07 21:45:00 +00:00
name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[list[str]] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
2021-11-07 21:45:00 +00:00
2018-08-01 19:15:18 +00:00
**kwargs, # explicit args to ``fn``
2021-11-07 21:45:00 +00:00
) -> Portal:
2018-08-01 19:15:18 +00:00
"""Spawn a new actor, run a lone task, then terminate the actor and
return its result.
Actors spawned using this method are kept alive at nursery teardown
until the task spawned by executing ``fn`` completes at which point
the actor is terminated.
"""
mod_path = fn.__module__
if name is None:
# use the explicit function name if not provided
name = fn.__name__
2018-08-01 19:15:18 +00:00
portal = await self.start_actor(
name,
2021-04-29 02:10:59 +00:00
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
2018-08-01 19:15:18 +00:00
bind_addr=bind_addr,
loglevel=loglevel,
2020-01-20 16:10:51 +00:00
# use the run_in_actor nursery
nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
2018-08-01 19:15:18 +00:00
)
# XXX: don't allow stream funcs
if not (
inspect.iscoroutinefunction(fn) and
not getattr(fn, '_tractor_stream_function', False)
):
raise TypeError(f'{fn} must be an async function!')
# this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below.
self._cancel_after_result_on_exit.add(portal)
2018-08-01 19:15:18 +00:00
await portal._submit_for_result(
mod_path,
fn.__name__,
**kwargs
)
2018-07-14 20:09:05 +00:00
return portal
async def cancel(self, hard_kill: bool = False) -> None:
2018-07-14 20:09:05 +00:00
"""Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
2018-07-14 20:09:05 +00:00
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
self.cancelled = True
log.cancel(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs:
2020-01-20 16:10:51 +00:00
async with trio.open_nursery() as nursery:
2018-08-01 19:15:18 +00:00
for subactor, proc, portal in self._children.values():
# TODO: are we ever even going to use this or
# is the spawning backend responsible for such
# things? I'm thinking latter.
2018-08-01 19:15:18 +00:00
if hard_kill:
proc.terminate()
2018-08-01 19:15:18 +00:00
else:
if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid]
2018-09-10 19:19:49 +00:00
log.warning(
2018-08-01 19:15:18 +00:00
f"{subactor.uid} wasn't finished spawning?")
2018-08-01 19:15:18 +00:00
await event.wait()
2018-08-01 19:15:18 +00:00
# channel/portal should now be up
_, _, portal = self._children[subactor.uid]
# XXX should be impossible to get here
# unless method was called from within
# shielded cancel scope.
2018-08-01 19:15:18 +00:00
if portal is None:
# cancelled while waiting on the event
# to arrive
2018-08-01 19:15:18 +00:00
chan = self._actor._peers[subactor.uid][-1]
if chan:
portal = Portal(chan)
else: # there's no other choice left
proc.terminate()
2018-08-01 19:15:18 +00:00
# spawn cancel tasks for each sub-actor
2018-08-31 21:16:24 +00:00
assert portal
if portal.channel.connected():
nursery.start_soon(portal.cancel_actor)
2018-07-14 20:09:05 +00:00
# if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes
if cs.cancelled_caught:
log.error(
f"Failed to cancel {self}\nHard killing process tree!")
for subactor, proc, portal in self._children.values():
log.warning(f"Hard killing process {proc}")
proc.terminate()
# mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set()
2018-07-14 20:09:05 +00:00
2021-02-24 17:59:43 +00:00
@asynccontextmanager
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], Exception] = {}
2021-02-24 17:59:43 +00:00
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
# errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
2021-02-24 17:59:43 +00:00
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.runtime(
2021-02-24 17:59:43 +00:00
f"Waiting on subactors {anursery._children} "
"to complete"
)
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set()
2021-02-24 17:59:43 +00:00
except BaseException as err:
2021-06-10 18:02:12 +00:00
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await maybe_wait_for_debugger(
child_in_debug=anursery._at_least_one_child_in_debug
)
2021-06-10 18:02:12 +00:00
2021-02-24 17:59:43 +00:00
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
2021-02-24 17:59:43 +00:00
try:
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.cancel(
2021-02-24 17:59:43 +00:00
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery
except (
Exception,
trio.MultiError,
trio.Cancelled
) as err:
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await maybe_wait_for_debugger(
child_in_debug=anursery._at_least_one_child_in_debug
)
2021-02-24 17:59:43 +00:00
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.cancel(f"Nursery cancelling due to {err}")
2021-02-24 17:59:43 +00:00
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint
# after nursery exit
2018-07-14 20:09:05 +00:00
@asynccontextmanager
async def open_nursery(
**kwargs,
2021-11-07 21:45:00 +00:00
) -> typing.AsyncGenerator[ActorNursery, None]:
2021-11-07 21:45:00 +00:00
'''
Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors.
When an actor is spawned a new trio task is started which
invokes one of the process spawning backends to create and start
a new subprocess. These tasks are started by one of two nurseries
detailed below. The reason for spawning processes from within
a new task is because ``trio_run_in_process`` itself creates a new
internal nursery and the same task that opens a nursery **must**
close it. It turns out this approach is probably more correct
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
2021-11-07 21:45:00 +00:00
'''
implicit_runtime = False
actor = current_actor(err_on_no_runtime=False)
try:
if actor is None and is_main_process():
2021-05-10 11:23:39 +00:00
# if we are the parent process start the
# actor runtime implicitly
log.info("Starting actor runtime!")
# mark us for teardown on exit
implicit_runtime = True
async with open_root_actor(**kwargs) as actor:
assert actor is current_actor()
try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:
anursery.exited.set()
else: # sub-nursery case
try:
2021-06-10 18:02:12 +00:00
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:
anursery.exited.set()
finally:
log.debug("Nursery teardown complete")
# shutdown runtime if it was started
if implicit_runtime:
log.info("Shutting down actor tree")