From 5ed5d18ccbc545b3bc010a4b807d80bf856454c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jan 2021 08:28:06 -0500 Subject: [PATCH 1/3] Begin rpc_module_paths deprecation --- tractor/_actor.py | 14 +++++++------- tractor/_root.py | 18 ++++++++++++++---- tractor/_spawn.py | 4 ++-- tractor/_trionics.py | 38 +++++++++++++++++++++++++++----------- 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3656889..cb9dc7b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -48,7 +48,7 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], - task_status=trio.TASK_STATUS_IGNORED + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ): """Invoke local func and deliver result(s) over provided channel. """ @@ -199,7 +199,7 @@ class Actor: self, name: str, *, - rpc_module_paths: List[str] = [], + enable_modules: List[str] = [], uid: str = None, loglevel: str = None, arbiter_addr: Optional[Tuple[str, int]] = None, @@ -219,14 +219,14 @@ class Actor: self._parent_main_data = _mp_fixup_main._mp_figure_out_main() # always include debugging tools module - rpc_module_paths.append('tractor._debug') + enable_modules.append('tractor._debug') mods = {} - for name in rpc_module_paths: + for name in enable_modules: mod = importlib.import_module(name) mods[name] = _get_mod_abspath(mod) - self.rpc_module_paths = mods + self.enable_modules = mods self._mods: Dict[str, ModuleType] = {} # TODO: consider making this a dynamically defined @@ -293,7 +293,7 @@ class Actor: _mp_fixup_main._fixup_main_from_path( parent_data['init_main_from_path']) - for modpath, filepath in self.rpc_module_paths.items(): + for modpath, filepath in self.enable_modules.items(): # XXX append the allowed module to the python path which # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) @@ -317,7 +317,7 @@ class Actor: if ns == '__main__': msg = ( "\n\nMake sure you exposed the current module using:\n\n" - "ActorNursery.start_actor(, rpc_module_paths=" + "ActorNursery.start_actor(, enable_modules=" "[__name__])" ) diff --git a/tractor/_root.py b/tractor/_root.py index d8026b4..ec2ccfd 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -48,6 +48,7 @@ async def open_root_actor( # internal logging loglevel: Optional[str] = None, + enable_modules: Optional[List] = None, rpc_module_paths: Optional[List] = None, ) -> typing.Any: @@ -58,7 +59,16 @@ async def open_root_actor( _state._runtime_vars['_is_root'] = True # caps based rpc list - expose_modules = rpc_module_paths or [] + enable_modules = enable_modules or [] + + if rpc_module_paths: + warnings.warn( + "`rpc_module_paths` is now deprecated, use " + " `enable_modules` instead.", + DeprecationWarning, + stacklevel=2, + ) + enable_modules.extend(rpc_module_paths) if start_method is not None: _spawn.try_set_start_method(start_method) @@ -68,7 +78,7 @@ async def open_root_actor( # expose internal debug module to every actor allowing # for use of ``await tractor.breakpoint()`` - expose_modules.append('tractor._debug') + enable_modules.append('tractor._debug') elif debug_mode: raise RuntimeError( @@ -105,7 +115,7 @@ async def open_root_actor( name or 'anonymous', arbiter_addr=arbiter_addr, loglevel=loglevel, - rpc_module_paths=expose_modules, + enable_modules=enable_modules, ) host, port = (host, 0) @@ -121,7 +131,7 @@ async def open_root_actor( name or 'arbiter', arbiter_addr=arbiter_addr, loglevel=loglevel, - rpc_module_paths=expose_modules, + enable_modules=enable_modules, ) try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 78158bb..31a414c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -247,7 +247,7 @@ async def new_proc( # send additional init params await chan.send({ "_parent_main_data": subactor._parent_main_data, - "rpc_module_paths": subactor.rpc_module_paths, + "enable_modules": subactor.enable_modules, "_arb_addr": subactor._arb_addr, "bind_host": bind_addr[0], "bind_port": bind_addr[1], @@ -290,7 +290,7 @@ async def new_proc( # if we're the "main" process start the forkserver # only once and pass its ipc info to downstream # children - # forkserver.set_forkserver_preload(rpc_module_paths) + # forkserver.set_forkserver_preload(enable_modules) forkserver.ensure_running() fs_info = ( fs._forkserver_address, diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 9e61cab..490778d 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,14 +3,15 @@ """ from functools import partial import multiprocessing as mp -from typing import Tuple, List, Dict, Optional, Any +from typing import Tuple, List, Dict, Optional import typing from contextlib import AsyncExitStack +import warnings import trio from async_generator import asynccontextmanager -from ._state import current_actor, is_root_process, is_main_process +from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal @@ -56,6 +57,7 @@ class ActorNursery: *, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: List[str] = None, + enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, ) -> Portal: @@ -65,10 +67,21 @@ class ActorNursery: _rtv = _state._runtime_vars.copy() _rtv['_is_root'] = False + enable_modules = enable_modules or [] + + if rpc_module_paths: + warnings.warn( + "`rpc_module_paths` is now deprecated, use " + " `enable_modules` instead.", + DeprecationWarning, + stacklevel=2, + ) + enable_modules.extend(rpc_module_paths) + subactor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths or [], + enable_modules=enable_modules, loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) @@ -221,7 +234,6 @@ async def open_nursery( # mark us for teardown on exit implicit_runtime = True - # the collection of errors retreived from spawned sub-actors errors: Dict[Tuple[str, str], Exception] = {} @@ -263,18 +275,22 @@ async def open_nursery( # worry more are coming). anursery._join_procs.set() try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. + # XXX: hypothetically an error could be + # raised and then a cancel signal shows up + # slightly after in which case the `else:` + # block here might not complete? For now, + # shield both. with trio.CancelScope(shield=True): etype = type(err) - if etype in (trio.Cancelled, KeyboardInterrupt) or ( + if etype in ( + trio.Cancelled, + KeyboardInterrupt + ) or ( is_multi_cancelled(err) ): log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") + f"Nursery for {current_actor().uid} " + f"was cancelled with {etype}") else: log.exception( f"Nursery for {current_actor().uid} " From 3df001f3a932a1d28fcb74902eabe33c1774ed00 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jan 2021 20:46:42 -0500 Subject: [PATCH 2/3] Fix msg pub global lock sharing Using `None` as the default key for a `@msg.pub` can cause conflicts if there is more then one "taskless" (no tasks={,} passed) pub offered on an actor... So instead use the first trio "task name" (usually just the function name) instead thus avoiding this very hard to debug and understand problem. Probably should throw in a test but I'm super lazy today. --- tractor/msg.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index f778ac7..4184333 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -11,7 +11,6 @@ import trio import wrapt from .log import get_logger -from . import current_actor from ._streaming import Context __all__ = ['pub'] @@ -91,6 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx): _pub_state: Dict[str, dict] = {} +_pubtask2lock: Dict[str, dict] = {} def pub( @@ -178,22 +178,22 @@ def pub( subscribers. If you are ok to have a new task running for every call to ``pub_service()`` then probably don't need this. """ - global _pub_state + global _pubtask2lock # handle the decorator not called with () case if wrapped is None: return partial(pub, tasks=tasks) - task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { - None: trio.StrictFIFOLock()} + task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {} for name in tasks: task2lock[name] = trio.StrictFIFOLock() @wrapt.decorator async def wrapper(agen, instance, args, kwargs): - # this is used to extract arguments properly as per - # the `wrapt` docs + + # XXX: this is used to extract arguments properly as per the + # `wrapt` docs async def _execute( ctx: Context, topics: Set[str], @@ -203,14 +203,22 @@ def pub( packetizer: Callable = None, **kwargs, ): - if tasks and task_name is None: + if task_name is None: + task_name = trio.lowlevel.current_task().name + + if tasks and task_name not in tasks: raise TypeError( f"{agen} must be called with a `task_name` named " - f"argument with a falue from {tasks}") + f"argument with a value from {tasks}") + + elif not tasks and not task2lock: + # add a default root-task lock if none defined + task2lock[task_name] = trio.StrictFIFOLock() + + _pubtask2lock.update(task2lock) topics = set(topics) - lockmap = _pub_state.setdefault('_pubtask2lock', task2lock) - lock = lockmap[task_name] + lock = _pubtask2lock[task_name] all_subs = _pub_state.setdefault('_subs', {}) topics2ctxs = all_subs.setdefault(task_name, {}) From e546ead2ff88a07ab10da76b67c6144095ebd1fc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Jan 2021 18:18:44 -0500 Subject: [PATCH 3/3] Pub sub internals type fixes --- tractor/_actor.py | 7 +++++-- tractor/_portal.py | 4 ++-- tractor/_streaming.py | 2 +- tractor/msg.py | 6 +++--- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index cb9dc7b..f8f2bdd 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -9,7 +9,7 @@ import importlib.util import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional +from typing import Dict, List, Tuple, Any, Optional, Union from types import ModuleType import sys import os @@ -48,7 +48,9 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + Union[trio.CancelScope, BaseException] + ] = trio.TASK_STATUS_IGNORED, ): """Invoke local func and deliver result(s) over provided channel. """ @@ -155,6 +157,7 @@ async def _invoke( if cs is None: # error is from above code not from rpc invocation task_status.started(err) + finally: # RPC task bookeeping try: diff --git a/tractor/_portal.py b/tractor/_portal.py index 6c026f8..9ad4c2f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -204,8 +204,8 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, - wrap and return its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, in + a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9ed0f14..e683216 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -37,7 +37,7 @@ def current_context(): def stream(func): - """Mark an async function as a streaming routine. + """Mark an async function as a streaming routine with ``@stream``. """ func._tractor_stream_function = True sig = inspect.signature(func) diff --git a/tractor/msg.py b/tractor/msg.py index 4184333..5b343b6 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -3,7 +3,7 @@ Messaging pattern APIs and helpers. """ import inspect import typing -from typing import Dict, Any, Set, Union, Callable +from typing import Dict, Any, Set, Callable from functools import partial from async_generator import aclosing @@ -90,7 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx): _pub_state: Dict[str, dict] = {} -_pubtask2lock: Dict[str, dict] = {} +_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} def pub( @@ -184,7 +184,7 @@ def pub( if wrapped is None: return partial(pub, tasks=tasks) - task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {} + task2lock: Dict[str, trio.StrictFIFOLock] = {} for name in tasks: task2lock[name] = trio.StrictFIFOLock()