forked from goodboy/tractor
1
0
Fork 0

Merge pull request #187 from goodboy/deprecate_rpcmodpaths

Begin rpc_module_paths deprecation
sync_breakpoint
goodboy 2021-01-14 18:28:16 -05:00 committed by GitHub
commit 8fdab8e0be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 76 additions and 39 deletions

View File

@ -9,7 +9,7 @@ import importlib.util
import inspect import inspect
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -48,7 +48,9 @@ async def _invoke(
chan: Channel, chan: Channel,
func: typing.Callable, func: typing.Callable,
kwargs: Dict[str, Any], kwargs: Dict[str, Any],
task_status=trio.TASK_STATUS_IGNORED task_status: TaskStatus[
Union[trio.CancelScope, BaseException]
] = trio.TASK_STATUS_IGNORED,
): ):
"""Invoke local func and deliver result(s) over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
@ -155,6 +157,7 @@ async def _invoke(
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
task_status.started(err) task_status.started(err)
finally: finally:
# RPC task bookeeping # RPC task bookeeping
try: try:
@ -199,7 +202,7 @@ class Actor:
self, self,
name: str, name: str,
*, *,
rpc_module_paths: List[str] = [], enable_modules: List[str] = [],
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None, arbiter_addr: Optional[Tuple[str, int]] = None,
@ -219,14 +222,14 @@ class Actor:
self._parent_main_data = _mp_fixup_main._mp_figure_out_main() self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
# always include debugging tools module # always include debugging tools module
rpc_module_paths.append('tractor._debug') enable_modules.append('tractor._debug')
mods = {} mods = {}
for name in rpc_module_paths: for name in enable_modules:
mod = importlib.import_module(name) mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod) mods[name] = _get_mod_abspath(mod)
self.rpc_module_paths = mods self.enable_modules = mods
self._mods: Dict[str, ModuleType] = {} self._mods: Dict[str, ModuleType] = {}
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
@ -293,7 +296,7 @@ class Actor:
_mp_fixup_main._fixup_main_from_path( _mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path']) parent_data['init_main_from_path'])
for modpath, filepath in self.rpc_module_paths.items(): for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which # XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports. # should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath)) sys.path.append(os.path.dirname(filepath))
@ -317,7 +320,7 @@ class Actor:
if ns == '__main__': if ns == '__main__':
msg = ( msg = (
"\n\nMake sure you exposed the current module using:\n\n" "\n\nMake sure you exposed the current module using:\n\n"
"ActorNursery.start_actor(<name>, rpc_module_paths=" "ActorNursery.start_actor(<name>, enable_modules="
"[__name__])" "[__name__])"
) )

View File

@ -204,8 +204,8 @@ class Portal:
fn_name: Optional[str] = None, fn_name: Optional[str] = None,
**kwargs **kwargs
) -> Any: ) -> Any:
"""Submit a remote function to be scheduled and run by actor, """Submit a remote function to be scheduled and run by actor, in
wrap and return its (stream of) result(s). a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance. remote rpc task or a local async generator instance.

View File

@ -48,6 +48,7 @@ async def open_root_actor(
# internal logging # internal logging
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
enable_modules: Optional[List] = None,
rpc_module_paths: Optional[List] = None, rpc_module_paths: Optional[List] = None,
) -> typing.Any: ) -> typing.Any:
@ -58,7 +59,16 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True
# caps based rpc list # caps based rpc list
expose_modules = rpc_module_paths or [] enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
if start_method is not None: if start_method is not None:
_spawn.try_set_start_method(start_method) _spawn.try_set_start_method(start_method)
@ -68,7 +78,7 @@ async def open_root_actor(
# expose internal debug module to every actor allowing # expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()`` # for use of ``await tractor.breakpoint()``
expose_modules.append('tractor._debug') enable_modules.append('tractor._debug')
elif debug_mode: elif debug_mode:
raise RuntimeError( raise RuntimeError(
@ -105,7 +115,7 @@ async def open_root_actor(
name or 'anonymous', name or 'anonymous',
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
loglevel=loglevel, loglevel=loglevel,
rpc_module_paths=expose_modules, enable_modules=enable_modules,
) )
host, port = (host, 0) host, port = (host, 0)
@ -121,7 +131,7 @@ async def open_root_actor(
name or 'arbiter', name or 'arbiter',
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
loglevel=loglevel, loglevel=loglevel,
rpc_module_paths=expose_modules, enable_modules=enable_modules,
) )
try: try:

View File

@ -247,7 +247,7 @@ async def new_proc(
# send additional init params # send additional init params
await chan.send({ await chan.send({
"_parent_main_data": subactor._parent_main_data, "_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths, "enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr, "_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0], "bind_host": bind_addr[0],
"bind_port": bind_addr[1], "bind_port": bind_addr[1],
@ -290,7 +290,7 @@ async def new_proc(
# if we're the "main" process start the forkserver # if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream # only once and pass its ipc info to downstream
# children # children
# forkserver.set_forkserver_preload(rpc_module_paths) # forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running() forkserver.ensure_running()
fs_info = ( fs_info = (
fs._forkserver_address, fs._forkserver_address,

View File

@ -37,7 +37,7 @@ def current_context():
def stream(func): def stream(func):
"""Mark an async function as a streaming routine. """Mark an async function as a streaming routine with ``@stream``.
""" """
func._tractor_stream_function = True func._tractor_stream_function = True
sig = inspect.signature(func) sig = inspect.signature(func)

View File

@ -3,14 +3,15 @@
""" """
from functools import partial from functools import partial
import multiprocessing as mp import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any from typing import Tuple, List, Dict, Optional
import typing import typing
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
import warnings
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from ._state import current_actor, is_root_process, is_main_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
@ -56,6 +57,7 @@ class ActorNursery:
*, *,
bind_addr: Tuple[str, int] = _default_bind_addr, bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: List[str] = None, rpc_module_paths: List[str] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
) -> Portal: ) -> Portal:
@ -65,10 +67,21 @@ class ActorNursery:
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
subactor = Actor( subactor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [], enable_modules=enable_modules,
loglevel=loglevel, loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr, arbiter_addr=current_actor()._arb_addr,
) )
@ -221,7 +234,6 @@ async def open_nursery(
# mark us for teardown on exit # mark us for teardown on exit
implicit_runtime = True implicit_runtime = True
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {} errors: Dict[Tuple[str, str], Exception] = {}
@ -263,18 +275,22 @@ async def open_nursery(
# worry more are coming). # worry more are coming).
anursery._join_procs.set() anursery._join_procs.set()
try: try:
# XXX: hypothetically an error could be raised and then # XXX: hypothetically an error could be
# a cancel signal shows up slightly after in which case # raised and then a cancel signal shows up
# the `else:` block here might not complete? # slightly after in which case the `else:`
# For now, shield both. # block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
etype = type(err) etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt) or ( if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err) is_multi_cancelled(err)
): ):
log.warning( log.warning(
f"Nursery for {current_actor().uid} was " f"Nursery for {current_actor().uid} "
f"cancelled with {etype}") f"was cancelled with {etype}")
else: else:
log.exception( log.exception(
f"Nursery for {current_actor().uid} " f"Nursery for {current_actor().uid} "

View File

@ -3,7 +3,7 @@ Messaging pattern APIs and helpers.
""" """
import inspect import inspect
import typing import typing
from typing import Dict, Any, Set, Union, Callable from typing import Dict, Any, Set, Callable
from functools import partial from functools import partial
from async_generator import aclosing from async_generator import aclosing
@ -11,7 +11,6 @@ import trio
import wrapt import wrapt
from .log import get_logger from .log import get_logger
from . import current_actor
from ._streaming import Context from ._streaming import Context
__all__ = ['pub'] __all__ = ['pub']
@ -91,6 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx):
_pub_state: Dict[str, dict] = {} _pub_state: Dict[str, dict] = {}
_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {}
def pub( def pub(
@ -178,22 +178,22 @@ def pub(
subscribers. If you are ok to have a new task running for every call subscribers. If you are ok to have a new task running for every call
to ``pub_service()`` then probably don't need this. to ``pub_service()`` then probably don't need this.
""" """
global _pub_state global _pubtask2lock
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks)
task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { task2lock: Dict[str, trio.StrictFIFOLock] = {}
None: trio.StrictFIFOLock()}
for name in tasks: for name in tasks:
task2lock[name] = trio.StrictFIFOLock() task2lock[name] = trio.StrictFIFOLock()
@wrapt.decorator @wrapt.decorator
async def wrapper(agen, instance, args, kwargs): async def wrapper(agen, instance, args, kwargs):
# this is used to extract arguments properly as per
# the `wrapt` docs # XXX: this is used to extract arguments properly as per the
# `wrapt` docs
async def _execute( async def _execute(
ctx: Context, ctx: Context,
topics: Set[str], topics: Set[str],
@ -203,14 +203,22 @@ def pub(
packetizer: Callable = None, packetizer: Callable = None,
**kwargs, **kwargs,
): ):
if tasks and task_name is None: if task_name is None:
task_name = trio.lowlevel.current_task().name
if tasks and task_name not in tasks:
raise TypeError( raise TypeError(
f"{agen} must be called with a `task_name` named " f"{agen} must be called with a `task_name` named "
f"argument with a falue from {tasks}") f"argument with a value from {tasks}")
elif not tasks and not task2lock:
# add a default root-task lock if none defined
task2lock[task_name] = trio.StrictFIFOLock()
_pubtask2lock.update(task2lock)
topics = set(topics) topics = set(topics)
lockmap = _pub_state.setdefault('_pubtask2lock', task2lock) lock = _pubtask2lock[task_name]
lock = lockmap[task_name]
all_subs = _pub_state.setdefault('_subs', {}) all_subs = _pub_state.setdefault('_subs', {})
topics2ctxs = all_subs.setdefault(task_name, {}) topics2ctxs = all_subs.setdefault(task_name, {})