Begin rpc_module_paths deprecation

deprecate_rpcmodpaths
Tyler Goodlet 2021-01-05 08:28:06 -05:00
parent dfaf1e3631
commit 5ed5d18ccb
4 changed files with 50 additions and 24 deletions

View File

@ -48,7 +48,7 @@ async def _invoke(
chan: Channel,
func: typing.Callable,
kwargs: Dict[str, Any],
task_status=trio.TASK_STATUS_IGNORED
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
"""Invoke local func and deliver result(s) over provided channel.
"""
@ -199,7 +199,7 @@ class Actor:
self,
name: str,
*,
rpc_module_paths: List[str] = [],
enable_modules: List[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
@ -219,14 +219,14 @@ class Actor:
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
# always include debugging tools module
rpc_module_paths.append('tractor._debug')
enable_modules.append('tractor._debug')
mods = {}
for name in rpc_module_paths:
for name in enable_modules:
mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod)
self.rpc_module_paths = mods
self.enable_modules = mods
self._mods: Dict[str, ModuleType] = {}
# TODO: consider making this a dynamically defined
@ -293,7 +293,7 @@ class Actor:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])
for modpath, filepath in self.rpc_module_paths.items():
for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
@ -317,7 +317,7 @@ class Actor:
if ns == '__main__':
msg = (
"\n\nMake sure you exposed the current module using:\n\n"
"ActorNursery.start_actor(<name>, rpc_module_paths="
"ActorNursery.start_actor(<name>, enable_modules="
"[__name__])"
)

View File

@ -48,6 +48,7 @@ async def open_root_actor(
# internal logging
loglevel: Optional[str] = None,
enable_modules: Optional[List] = None,
rpc_module_paths: Optional[List] = None,
) -> typing.Any:
@ -58,7 +59,16 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True
# caps based rpc list
expose_modules = rpc_module_paths or []
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
if start_method is not None:
_spawn.try_set_start_method(start_method)
@ -68,7 +78,7 @@ async def open_root_actor(
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
expose_modules.append('tractor._debug')
enable_modules.append('tractor._debug')
elif debug_mode:
raise RuntimeError(
@ -105,7 +115,7 @@ async def open_root_actor(
name or 'anonymous',
arbiter_addr=arbiter_addr,
loglevel=loglevel,
rpc_module_paths=expose_modules,
enable_modules=enable_modules,
)
host, port = (host, 0)
@ -121,7 +131,7 @@ async def open_root_actor(
name or 'arbiter',
arbiter_addr=arbiter_addr,
loglevel=loglevel,
rpc_module_paths=expose_modules,
enable_modules=enable_modules,
)
try:

View File

@ -247,7 +247,7 @@ async def new_proc(
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
@ -290,7 +290,7 @@ async def new_proc(
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(rpc_module_paths)
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,

View File

@ -3,14 +3,15 @@
"""
from functools import partial
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
from typing import Tuple, List, Dict, Optional
import typing
from contextlib import AsyncExitStack
import warnings
import trio
from async_generator import asynccontextmanager
from ._state import current_actor, is_root_process, is_main_process
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
@ -56,6 +57,7 @@ class ActorNursery:
*,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: List[str] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
) -> Portal:
@ -65,10 +67,21 @@ class ActorNursery:
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
subactor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [],
enable_modules=enable_modules,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
@ -221,7 +234,6 @@ async def open_nursery(
# mark us for teardown on exit
implicit_runtime = True
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
@ -263,18 +275,22 @@ async def open_nursery(
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt) or (
if etype in (
trio.Cancelled,
KeyboardInterrupt
) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "