forked from goodboy/tractor
1
0
Fork 0

Merge pull request #185 from goodboy/implicit_runtime

Implicit runtime
deprecate_rpcmodpaths
goodboy 2021-01-08 22:07:43 -05:00 committed by GitHub
commit dfaf1e3631
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 196 additions and 120 deletions

View File

@ -61,10 +61,11 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
str(script_file),
]
# XXX: BE FOREVER WARNED: if you enable lots of tractor logging
# in the subprocess it may cause infinite blocking on the pipes
# due to backpressure!!!
proc = testdir.popen(
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
@ -101,6 +102,8 @@ def test_example(run_example_in_subproc, example_script):
with run_example_in_subproc(code) as proc:
proc.wait()
err, _ = proc.stderr.read(), proc.stdout.read()
# print(f'STDERR: {err}')
# print(f'STDOUT: {out}')
# if we get some gnarly output let's aggregate and raise
errmsg = err.decode()

View File

@ -35,8 +35,9 @@ async def test_self_is_registered(arb_addr):
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.wait_for_actor('arbiter') as portal:
assert portal.channel.uid[0] == 'arbiter'
with trio.fail_after(0.2):
async with tractor.wait_for_actor('root') as portal:
assert portal.channel.uid[0] == 'root'
@tractor_test
@ -46,8 +47,10 @@ async def test_self_is_registered_localportal(arb_addr):
assert actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='arbiter')
assert sockaddr[0] == arb_addr
with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='root')
assert sockaddr[0] == arb_addr
def test_local_actor_async_func(arb_addr):

View File

@ -31,7 +31,6 @@ log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# placeholder for function to set a ``trio.Event`` on debugger exit
_pdb_release_hook: Optional[Callable] = None
@ -120,7 +119,7 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes.
"""
task_name = trio.lowlevel.current_task()
task_name = trio.lowlevel.current_task().name
try:
log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
@ -276,7 +275,7 @@ def _mk_pdb():
def _set_trace(actor):
log.critical(f"\nAttaching pdb to actor: {actor.uid}\n")
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n")
pdb = _mk_pdb()
pdb.set_trace(

View File

@ -30,7 +30,10 @@ logger = log.get_logger('tractor')
async def open_root_actor(
# defaults are above
arbiter_addr: Tuple[str, int],
arbiter_addr: Tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
name: Optional[str] = 'root',
@ -42,7 +45,11 @@ async def open_root_actor(
# enables the multi-process debugger support
debug_mode: bool = False,
**kwargs,
# internal logging
loglevel: Optional[str] = None,
rpc_module_paths: Optional[List] = None,
) -> typing.Any:
"""Async entry point for ``tractor``.
@ -50,6 +57,9 @@ async def open_root_actor(
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
# caps based rpc list
expose_modules = rpc_module_paths or []
if start_method is not None:
_spawn.try_set_start_method(start_method)
@ -58,7 +68,7 @@ async def open_root_actor(
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
expose_modules.append('tractor._debug')
elif debug_mode:
raise RuntimeError(
@ -70,7 +80,7 @@ async def open_root_actor(
_default_arbiter_port
)
loglevel = kwargs.get('loglevel', log.get_loglevel())
loglevel = loglevel or log.get_loglevel()
if loglevel is not None:
log._default_loglevel = loglevel
log.get_console_log(loglevel)
@ -94,12 +104,14 @@ async def open_root_actor(
actor = Actor(
name or 'anonymous',
arbiter_addr=arbiter_addr,
**kwargs
loglevel=loglevel,
rpc_module_paths=expose_modules,
)
host, port = (host, 0)
else:
# start this local actor as the arbiter
# start this local actor as the arbiter (aka a regular actor who
# manages the local registry of "mailboxes")
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
@ -108,7 +120,8 @@ async def open_root_actor(
actor = Arbiter(
name or 'arbiter',
arbiter_addr=arbiter_addr,
**kwargs
loglevel=loglevel,
rpc_module_paths=expose_modules,
)
try:

View File

@ -7,7 +7,7 @@ import multiprocessing as mp
import trio
_current_actor: Optional['Actor'] = None # type: ignore
_current_actor: Optional['Actor'] = None # type: ignore # noqa
_runtime_vars: Dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
@ -15,14 +15,21 @@ _runtime_vars: Dict[str, Any] = {
}
def current_actor() -> 'Actor': # type: ignore
def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # noqa
"""Get the process-local actor instance.
"""
if _current_actor is None:
if _current_actor is None and err_on_no_runtime:
raise RuntimeError("No local actor has been initialized yet")
return _current_actor
_conc_name_getters = {
'task': trio.lowlevel.current_task,
'actor': current_actor
}
class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names"
_context_keys = ('task', 'actor')
@ -33,12 +40,9 @@ class ActorContextInfo(Mapping):
def __iter__(self):
return iter(self._context_keys)
def __getitem__(self, key: str):
def __getitem__(self, key: str) -> str:
try:
return {
'task': trio.lowlevel.current_task,
'actor': current_actor
}[key]().name
return _conc_name_getters[key]().name # type: ignore
except RuntimeError:
# no local actor/task context initialized yet
return f'no {key} context'

View File

@ -5,15 +5,17 @@ from functools import partial
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
import typing
from contextlib import AsyncExitStack
import trio
from async_generator import asynccontextmanager
from ._state import current_actor
from ._state import current_actor, is_root_process, is_main_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
from ._exceptions import is_multi_cancelled
from ._root import open_root_actor
from . import _state
from . import _spawn
@ -186,7 +188,9 @@ class ActorNursery:
@asynccontextmanager
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
async def open_nursery(
**kwargs,
) -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors.
@ -200,9 +204,23 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
implicit_runtime = False
actor = current_actor(err_on_no_runtime=False)
if actor is None and is_main_process():
# if we are the parent process start the actor runtime implicitly
log.info("Starting actor runtime!")
root_runtime_stack = AsyncExitStack()
actor = await root_runtime_stack.enter_async_context(
open_root_actor(**kwargs)
)
assert actor is current_actor()
# mark us for teardown on exit
implicit_runtime = True
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
@ -213,100 +231,111 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor, ria_nursery, da_nursery, errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
try:
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
)
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end
# ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# use `MultiError` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end
# ria_nursery scope end - nursery checkpoint
log.debug("Nursery teardown complete")
# after nursery exit
finally:
log.debug("Nursery teardown complete")
# shutdown runtime if it was started
if implicit_runtime:
log.info("Shutting down actor tree")
await root_runtime_stack.aclose()

View File

@ -2,7 +2,9 @@ import inspect
import platform
from functools import partial, wraps
from tractor import run
import trio
import tractor
# from tractor import run
__all__ = ['tractor_test']
@ -34,6 +36,7 @@ def tractor_test(fn):
**kwargs
):
# __tracebackhide__ = True
if 'arb_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
@ -54,11 +57,33 @@ def tractor_test(fn):
# set of subprocess spawning backends
kwargs['start_method'] = start_method
return run(
partial(fn, *args, **kwargs),
arbiter_addr=arb_addr,
loglevel=loglevel,
start_method=start_method,
)
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
arbiter_addr=arb_addr,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
# debug_mode=True,
) as actor:
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs),
return trio.run(main)
# arbiter_addr=arb_addr,
# loglevel=loglevel,
# start_method=start_method,
# )
return wrapper