diff --git a/tractor/_actor.py b/tractor/_actor.py index 3656889..f8f2bdd 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -9,7 +9,7 @@ import importlib.util import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional +from typing import Dict, List, Tuple, Any, Optional, Union from types import ModuleType import sys import os @@ -48,7 +48,9 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], - task_status=trio.TASK_STATUS_IGNORED + task_status: TaskStatus[ + Union[trio.CancelScope, BaseException] + ] = trio.TASK_STATUS_IGNORED, ): """Invoke local func and deliver result(s) over provided channel. """ @@ -155,6 +157,7 @@ async def _invoke( if cs is None: # error is from above code not from rpc invocation task_status.started(err) + finally: # RPC task bookeeping try: @@ -199,7 +202,7 @@ class Actor: self, name: str, *, - rpc_module_paths: List[str] = [], + enable_modules: List[str] = [], uid: str = None, loglevel: str = None, arbiter_addr: Optional[Tuple[str, int]] = None, @@ -219,14 +222,14 @@ class Actor: self._parent_main_data = _mp_fixup_main._mp_figure_out_main() # always include debugging tools module - rpc_module_paths.append('tractor._debug') + enable_modules.append('tractor._debug') mods = {} - for name in rpc_module_paths: + for name in enable_modules: mod = importlib.import_module(name) mods[name] = _get_mod_abspath(mod) - self.rpc_module_paths = mods + self.enable_modules = mods self._mods: Dict[str, ModuleType] = {} # TODO: consider making this a dynamically defined @@ -293,7 +296,7 @@ class Actor: _mp_fixup_main._fixup_main_from_path( parent_data['init_main_from_path']) - for modpath, filepath in self.rpc_module_paths.items(): + for modpath, filepath in self.enable_modules.items(): # XXX append the allowed module to the python path which # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) @@ -317,7 +320,7 @@ class Actor: if ns == '__main__': msg = ( "\n\nMake sure you exposed the current module using:\n\n" - "ActorNursery.start_actor(, rpc_module_paths=" + "ActorNursery.start_actor(, enable_modules=" "[__name__])" ) diff --git a/tractor/_portal.py b/tractor/_portal.py index 6c026f8..9ad4c2f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -204,8 +204,8 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, - wrap and return its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, in + a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. diff --git a/tractor/_root.py b/tractor/_root.py index d8026b4..ec2ccfd 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -48,6 +48,7 @@ async def open_root_actor( # internal logging loglevel: Optional[str] = None, + enable_modules: Optional[List] = None, rpc_module_paths: Optional[List] = None, ) -> typing.Any: @@ -58,7 +59,16 @@ async def open_root_actor( _state._runtime_vars['_is_root'] = True # caps based rpc list - expose_modules = rpc_module_paths or [] + enable_modules = enable_modules or [] + + if rpc_module_paths: + warnings.warn( + "`rpc_module_paths` is now deprecated, use " + " `enable_modules` instead.", + DeprecationWarning, + stacklevel=2, + ) + enable_modules.extend(rpc_module_paths) if start_method is not None: _spawn.try_set_start_method(start_method) @@ -68,7 +78,7 @@ async def open_root_actor( # expose internal debug module to every actor allowing # for use of ``await tractor.breakpoint()`` - expose_modules.append('tractor._debug') + enable_modules.append('tractor._debug') elif debug_mode: raise RuntimeError( @@ -105,7 +115,7 @@ async def open_root_actor( name or 'anonymous', arbiter_addr=arbiter_addr, loglevel=loglevel, - rpc_module_paths=expose_modules, + enable_modules=enable_modules, ) host, port = (host, 0) @@ -121,7 +131,7 @@ async def open_root_actor( name or 'arbiter', arbiter_addr=arbiter_addr, loglevel=loglevel, - rpc_module_paths=expose_modules, + enable_modules=enable_modules, ) try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 78158bb..31a414c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -247,7 +247,7 @@ async def new_proc( # send additional init params await chan.send({ "_parent_main_data": subactor._parent_main_data, - "rpc_module_paths": subactor.rpc_module_paths, + "enable_modules": subactor.enable_modules, "_arb_addr": subactor._arb_addr, "bind_host": bind_addr[0], "bind_port": bind_addr[1], @@ -290,7 +290,7 @@ async def new_proc( # if we're the "main" process start the forkserver # only once and pass its ipc info to downstream # children - # forkserver.set_forkserver_preload(rpc_module_paths) + # forkserver.set_forkserver_preload(enable_modules) forkserver.ensure_running() fs_info = ( fs._forkserver_address, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9ed0f14..e683216 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -37,7 +37,7 @@ def current_context(): def stream(func): - """Mark an async function as a streaming routine. + """Mark an async function as a streaming routine with ``@stream``. """ func._tractor_stream_function = True sig = inspect.signature(func) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 9e61cab..490778d 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,14 +3,15 @@ """ from functools import partial import multiprocessing as mp -from typing import Tuple, List, Dict, Optional, Any +from typing import Tuple, List, Dict, Optional import typing from contextlib import AsyncExitStack +import warnings import trio from async_generator import asynccontextmanager -from ._state import current_actor, is_root_process, is_main_process +from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal @@ -56,6 +57,7 @@ class ActorNursery: *, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: List[str] = None, + enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, ) -> Portal: @@ -65,10 +67,21 @@ class ActorNursery: _rtv = _state._runtime_vars.copy() _rtv['_is_root'] = False + enable_modules = enable_modules or [] + + if rpc_module_paths: + warnings.warn( + "`rpc_module_paths` is now deprecated, use " + " `enable_modules` instead.", + DeprecationWarning, + stacklevel=2, + ) + enable_modules.extend(rpc_module_paths) + subactor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths or [], + enable_modules=enable_modules, loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) @@ -221,7 +234,6 @@ async def open_nursery( # mark us for teardown on exit implicit_runtime = True - # the collection of errors retreived from spawned sub-actors errors: Dict[Tuple[str, str], Exception] = {} @@ -263,18 +275,22 @@ async def open_nursery( # worry more are coming). anursery._join_procs.set() try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. with trio.CancelScope(shield=True): etype = type(err) - if etype in (trio.Cancelled, KeyboardInterrupt) or ( + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( is_multi_cancelled(err) ): log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") else: log.exception( f"Nursery for {current_actor().uid} " diff --git a/tractor/msg.py b/tractor/msg.py index f778ac7..5b343b6 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -3,7 +3,7 @@ Messaging pattern APIs and helpers. """ import inspect import typing -from typing import Dict, Any, Set, Union, Callable +from typing import Dict, Any, Set, Callable from functools import partial from async_generator import aclosing @@ -11,7 +11,6 @@ import trio import wrapt from .log import get_logger -from . import current_actor from ._streaming import Context __all__ = ['pub'] @@ -91,6 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx): _pub_state: Dict[str, dict] = {} +_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} def pub( @@ -178,22 +178,22 @@ def pub( subscribers. If you are ok to have a new task running for every call to ``pub_service()`` then probably don't need this. """ - global _pub_state + global _pubtask2lock # handle the decorator not called with () case if wrapped is None: return partial(pub, tasks=tasks) - task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { - None: trio.StrictFIFOLock()} + task2lock: Dict[str, trio.StrictFIFOLock] = {} for name in tasks: task2lock[name] = trio.StrictFIFOLock() @wrapt.decorator async def wrapper(agen, instance, args, kwargs): - # this is used to extract arguments properly as per - # the `wrapt` docs + + # XXX: this is used to extract arguments properly as per the + # `wrapt` docs async def _execute( ctx: Context, topics: Set[str], @@ -203,14 +203,22 @@ def pub( packetizer: Callable = None, **kwargs, ): - if tasks and task_name is None: + if task_name is None: + task_name = trio.lowlevel.current_task().name + + if tasks and task_name not in tasks: raise TypeError( f"{agen} must be called with a `task_name` named " - f"argument with a falue from {tasks}") + f"argument with a value from {tasks}") + + elif not tasks and not task2lock: + # add a default root-task lock if none defined + task2lock[task_name] = trio.StrictFIFOLock() + + _pubtask2lock.update(task2lock) topics = set(topics) - lockmap = _pub_state.setdefault('_pubtask2lock', task2lock) - lock = lockmap[task_name] + lock = _pubtask2lock[task_name] all_subs = _pub_state.setdefault('_subs', {}) topics2ctxs = all_subs.setdefault(task_name, {})