diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 73e6a69..ea676a2 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -61,10 +61,11 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): str(script_file), ] + # XXX: BE FOREVER WARNED: if you enable lots of tractor logging + # in the subprocess it may cause infinite blocking on the pipes + # due to backpressure!!! proc = testdir.popen( cmdargs, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, **kwargs, ) assert not proc.returncode @@ -101,6 +102,8 @@ def test_example(run_example_in_subproc, example_script): with run_example_in_subproc(code) as proc: proc.wait() err, _ = proc.stderr.read(), proc.stdout.read() + # print(f'STDERR: {err}') + # print(f'STDOUT: {out}') # if we get some gnarly output let's aggregate and raise errmsg = err.decode() diff --git a/tests/test_local.py b/tests/test_local.py index ef64795..60e875b 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -35,8 +35,9 @@ async def test_self_is_registered(arb_addr): "Verify waiting on the arbiter to register itself using the standard api." actor = tractor.current_actor() assert actor.is_arbiter - async with tractor.wait_for_actor('arbiter') as portal: - assert portal.channel.uid[0] == 'arbiter' + with trio.fail_after(0.2): + async with tractor.wait_for_actor('root') as portal: + assert portal.channel.uid[0] == 'root' @tractor_test @@ -46,8 +47,10 @@ async def test_self_is_registered_localportal(arb_addr): assert actor.is_arbiter async with tractor.get_arbiter(*arb_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) - sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='arbiter') - assert sockaddr[0] == arb_addr + + with trio.fail_after(0.2): + sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='root') + assert sockaddr[0] == arb_addr def test_local_actor_async_func(arb_addr): diff --git a/tractor/_debug.py b/tractor/_debug.py index 7f20c5e..9c1fb71 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -31,7 +31,6 @@ log = get_logger(__name__) __all__ = ['breakpoint', 'post_mortem'] - # placeholder for function to set a ``trio.Event`` on debugger exit _pdb_release_hook: Optional[Callable] = None @@ -120,7 +119,7 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: """Acquire a actor local FIFO lock meant to mutex entry to a local debugger entry point to avoid tty clobbering by multiple processes. """ - task_name = trio.lowlevel.current_task() + task_name = trio.lowlevel.current_task().name try: log.debug( f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") @@ -276,7 +275,7 @@ def _mk_pdb(): def _set_trace(actor): - log.critical(f"\nAttaching pdb to actor: {actor.uid}\n") + log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") pdb = _mk_pdb() pdb.set_trace( diff --git a/tractor/_root.py b/tractor/_root.py index 7c84af1..d8026b4 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -30,7 +30,10 @@ logger = log.get_logger('tractor') async def open_root_actor( # defaults are above - arbiter_addr: Tuple[str, int], + arbiter_addr: Tuple[str, int] = ( + _default_arbiter_host, + _default_arbiter_port, + ), name: Optional[str] = 'root', @@ -42,7 +45,11 @@ async def open_root_actor( # enables the multi-process debugger support debug_mode: bool = False, - **kwargs, + # internal logging + loglevel: Optional[str] = None, + + rpc_module_paths: Optional[List] = None, + ) -> typing.Any: """Async entry point for ``tractor``. @@ -50,6 +57,9 @@ async def open_root_actor( # mark top most level process as root actor _state._runtime_vars['_is_root'] = True + # caps based rpc list + expose_modules = rpc_module_paths or [] + if start_method is not None: _spawn.try_set_start_method(start_method) @@ -58,7 +68,7 @@ async def open_root_actor( # expose internal debug module to every actor allowing # for use of ``await tractor.breakpoint()`` - kwargs.setdefault('rpc_module_paths', []).append('tractor._debug') + expose_modules.append('tractor._debug') elif debug_mode: raise RuntimeError( @@ -70,7 +80,7 @@ async def open_root_actor( _default_arbiter_port ) - loglevel = kwargs.get('loglevel', log.get_loglevel()) + loglevel = loglevel or log.get_loglevel() if loglevel is not None: log._default_loglevel = loglevel log.get_console_log(loglevel) @@ -94,12 +104,14 @@ async def open_root_actor( actor = Actor( name or 'anonymous', arbiter_addr=arbiter_addr, - **kwargs + loglevel=loglevel, + rpc_module_paths=expose_modules, ) host, port = (host, 0) else: - # start this local actor as the arbiter + # start this local actor as the arbiter (aka a regular actor who + # manages the local registry of "mailboxes") # Note that if the current actor is the arbiter it is desirable # for it to stay up indefinitely until a re-election process has @@ -108,7 +120,8 @@ async def open_root_actor( actor = Arbiter( name or 'arbiter', arbiter_addr=arbiter_addr, - **kwargs + loglevel=loglevel, + rpc_module_paths=expose_modules, ) try: diff --git a/tractor/_state.py b/tractor/_state.py index 37e6a1f..37fdafa 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -7,7 +7,7 @@ import multiprocessing as mp import trio -_current_actor: Optional['Actor'] = None # type: ignore +_current_actor: Optional['Actor'] = None # type: ignore # noqa _runtime_vars: Dict[str, Any] = { '_debug_mode': False, '_is_root': False, @@ -15,14 +15,21 @@ _runtime_vars: Dict[str, Any] = { } -def current_actor() -> 'Actor': # type: ignore +def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # noqa """Get the process-local actor instance. """ - if _current_actor is None: + if _current_actor is None and err_on_no_runtime: raise RuntimeError("No local actor has been initialized yet") + return _current_actor +_conc_name_getters = { + 'task': trio.lowlevel.current_task, + 'actor': current_actor +} + + class ActorContextInfo(Mapping): "Dyanmic lookup for local actor and task names" _context_keys = ('task', 'actor') @@ -33,12 +40,9 @@ class ActorContextInfo(Mapping): def __iter__(self): return iter(self._context_keys) - def __getitem__(self, key: str): + def __getitem__(self, key: str) -> str: try: - return { - 'task': trio.lowlevel.current_task, - 'actor': current_actor - }[key]().name + return _conc_name_getters[key]().name # type: ignore except RuntimeError: # no local actor/task context initialized yet return f'no {key} context' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 0d54b24..9e61cab 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -5,15 +5,17 @@ from functools import partial import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing +from contextlib import AsyncExitStack import trio from async_generator import asynccontextmanager -from ._state import current_actor +from ._state import current_actor, is_root_process, is_main_process from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal from ._exceptions import is_multi_cancelled +from ._root import open_root_actor from . import _state from . import _spawn @@ -186,7 +188,9 @@ class ActorNursery: @asynccontextmanager -async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: +async def open_nursery( + **kwargs, +) -> typing.AsyncGenerator[ActorNursery, None]: """Create and yield a new ``ActorNursery`` to be used for spawning structured concurrent subactors. @@ -200,9 +204,23 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: anyway since it is more clear from the following nested nurseries which cancellation scopes correspond to each spawned subactor set. """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") + implicit_runtime = False + + actor = current_actor(err_on_no_runtime=False) + + if actor is None and is_main_process(): + + # if we are the parent process start the actor runtime implicitly + log.info("Starting actor runtime!") + root_runtime_stack = AsyncExitStack() + actor = await root_runtime_stack.enter_async_context( + open_root_actor(**kwargs) + ) + assert actor is current_actor() + + # mark us for teardown on exit + implicit_runtime = True + # the collection of errors retreived from spawned sub-actors errors: Dict[Tuple[str, str], Exception] = {} @@ -213,100 +231,111 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # a supervisor strategy **before** blocking indefinitely to wait for # actors spawned in "daemon mode" (aka started using # ``ActorNursery.start_actor()``). - async with trio.open_nursery() as da_nursery: - try: - # This is the inner level "run in actor" nursery. It is - # awaited first since actors spawned in this way (using - # ``ActorNusery.run_in_actor()``) are expected to only - # return a single result and then complete (i.e. be canclled - # gracefully). Errors collected from these actors are - # immediately raised for handling by a supervisor strategy. - # As such if the strategy propagates any error(s) upwards - # the above "daemon actor" nursery will be notified. - async with trio.open_nursery() as ria_nursery: - anursery = ActorNursery( - actor, ria_nursery, da_nursery, errors - ) - try: - # spawning of actors happens in the caller's scope - # after we yield upwards - yield anursery - log.debug( - f"Waiting on subactors {anursery._children} " - "to complete" + try: + async with trio.open_nursery() as da_nursery: + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, + ria_nursery, + da_nursery, + errors ) - except BaseException as err: - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - anursery._join_procs.set() try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. - with trio.CancelScope(shield=True): - etype = type(err) - if etype in (trio.Cancelled, KeyboardInterrupt) or ( - is_multi_cancelled(err) - ): - log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + # spawning of actors happens in the caller's scope + # after we yield upwards + yield anursery + log.debug( + f"Waiting on subactors {anursery._children} " + "to complete" + ) + except BaseException as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. + with trio.CancelScope(shield=True): + etype = type(err) + if etype in (trio.Cancelled, KeyboardInterrupt) or ( + is_multi_cancelled(err) + ): + log.warning( + f"Nursery for {current_actor().uid} was " + f"cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") - # cancel all subactors - await anursery.cancel() + # cancel all subactors + await anursery.cancel() - except trio.MultiError as merr: - # If we receive additional errors while waiting on - # remaining subactors that were cancelled, - # aggregate those errors with the original error - # that triggered this teardown. - if err not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [err]) - else: - raise + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug("Waiting on all subactors to complete") + anursery._join_procs.set() - # ria_nursery scope end + # ria_nursery scope end - # XXX: do we need a `trio.Cancelled` catch here as well? - except (Exception, trio.MultiError, trio.Cancelled) as err: - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - log.warning(f"Nursery cancelling due to {err}") - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - raise - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: + # XXX: do we need a `trio.Cancelled` catch here as well? + except (Exception, trio.MultiError, trio.Cancelled) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). + log.warning(f"Nursery cancelling due to {err}") if anursery._children: with trio.CancelScope(shield=True): await anursery.cancel() + raise + finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() - # use `MultiError` as needed - if len(errors) > 1: - raise trio.MultiError(tuple(errors.values())) - else: - raise list(errors.values())[0] + # use `MultiError` as needed + if len(errors) > 1: + raise trio.MultiError(tuple(errors.values())) + else: + raise list(errors.values())[0] - # ria_nursery scope end + # ria_nursery scope end - nursery checkpoint - log.debug("Nursery teardown complete") + # after nursery exit + finally: + log.debug("Nursery teardown complete") + + # shutdown runtime if it was started + if implicit_runtime: + log.info("Shutting down actor tree") + await root_runtime_stack.aclose() diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 65f199f..734e367 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -2,7 +2,9 @@ import inspect import platform from functools import partial, wraps -from tractor import run +import trio +import tractor +# from tractor import run __all__ = ['tractor_test'] @@ -34,6 +36,7 @@ def tractor_test(fn): **kwargs ): # __tracebackhide__ = True + if 'arb_addr' in inspect.signature(fn).parameters: # injects test suite fixture value to test as well # as `run()` @@ -54,11 +57,33 @@ def tractor_test(fn): # set of subprocess spawning backends kwargs['start_method'] = start_method - return run( - partial(fn, *args, **kwargs), - arbiter_addr=arb_addr, - loglevel=loglevel, - start_method=start_method, - ) + if kwargs: + + # use explicit root actor start + + async def _main(): + async with tractor.open_root_actor( + # **kwargs, + arbiter_addr=arb_addr, + loglevel=loglevel, + start_method=start_method, + + # TODO: only enable when pytest is passed --pdb + # debug_mode=True, + + ) as actor: + await fn(*args, **kwargs) + + main = _main + + else: + # use implicit root actor start + main = partial(fn, *args, **kwargs), + + return trio.run(main) + # arbiter_addr=arb_addr, + # loglevel=loglevel, + # start_method=start_method, + # ) return wrapper