Removing cloudpickle dependency by passing the subactor information to the child through sys.argv

drop_cloudpickle
Guillermo Rodriguez 2020-07-25 15:48:46 -03:00
parent dddbeb0e71
commit 7f29f73f25
5 changed files with 120 additions and 34 deletions

23
tests/_test.py 100644
View File

@ -0,0 +1,23 @@
import tractor
from tractor.log import get_console_log
log = get_console_log('trace')
def cellar_door():
return "Dang that's beautiful"
async def test_most_beautiful_word():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('some_linguist', cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
tractor.run(test_most_beautiful_word)

View File

@ -186,7 +186,7 @@ class Actor:
statespace: Optional[Dict[str, Any]] = None, statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None, arbiter_addr: Optional[Tuple[str, int]] = None
) -> None: ) -> None:
"""This constructor is called in the parent actor **before** the spawning """This constructor is called in the parent actor **before** the spawning
phase (aka before a new process is executed). phase (aka before a new process is executed).
@ -203,6 +203,8 @@ class Actor:
mod = importlib.import_module(name) mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod) mods[name] = _get_mod_abspath(mod)
log.debug(f"{name} Loaded RPC modules: {mods}")
self.rpc_module_paths = mods self.rpc_module_paths = mods
self._mods: Dict[str, ModuleType] = {} self._mods: Dict[str, ModuleType] = {}
@ -594,7 +596,7 @@ class Actor:
self.load_modules() self.load_modules()
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}` @ {arbiter_addr}")
assert isinstance(arbiter_addr, tuple) assert isinstance(arbiter_addr, tuple)
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run( await arb_portal.run(

View File

@ -1,6 +1,59 @@
import sys import sys
import trio import trio
import cloudpickle import argparse
from ._actor import Actor
from ._entry import _trio_main
"""This is the "bootloader" for actors started using the native trio backend
added in #128
"""
if __name__ == "__main__": if __name__ == "__main__":
trio.run(cloudpickle.load(sys.stdin.buffer))
parser = argparse.ArgumentParser()
parser.add_argument("name")
parser.add_argument("rpc_module_paths") # comma separated mod paths
parser.add_argument("uid")
parser.add_argument("loglevel")
parser.add_argument("bind_addr")
parser.add_argument("parent_addr")
parser.add_argument("arbiter_addr")
args = parser.parse_args()
rpc_paths = []
for rpc_mod_path in args.rpc_module_paths.split(";"):
rpc_paths.append(rpc_mod_path)
bind_addr = args.bind_addr.split(":")
bind_addr = (bind_addr[0], int(bind_addr[1]))
parent_addr = args.parent_addr.split(":")
parent_addr = (parent_addr[0], int(parent_addr[1]))
arbiter_addr = args.arbiter_addr.split(":")
arbiter_addr = (arbiter_addr[0], int(arbiter_addr[1]))
if args.loglevel == "None":
loglevel = None
else:
loglevel = args.loglevel
subactor = Actor(
args.name,
rpc_module_paths=rpc_paths,
uid=args.uid,
loglevel=loglevel,
arbiter_addr=arbiter_addr
)
_trio_main(
subactor,
bind_addr,
parent_addr=parent_addr
)

View File

@ -52,15 +52,12 @@ def _mp_main(
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")
async def _trio_main( def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int] = None
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio` based subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
""" """
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
@ -71,5 +68,14 @@ async def _trio_main(
_state._current_actor = actor _state._current_actor = actor
await actor._async_main(accept_addr, parent_addr=parent_addr) trio_main = partial(
actor._async_main,
accept_addr,
parent_addr=parent_addr
)
try:
trio.run(trio_main)
except KeyboardInterrupt:
pass
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")

View File

@ -7,10 +7,8 @@ import subprocess
import multiprocessing as mp import multiprocessing as mp
import platform import platform
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from functools import partial
import trio import trio
import cloudpickle
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import aclosing, asynccontextmanager from async_generator import aclosing, asynccontextmanager
@ -25,7 +23,7 @@ except ImportError:
from multiprocessing import forkserver # type: ignore from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override, _child
from ._state import current_actor from ._state import current_actor
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
@ -158,25 +156,31 @@ async def cancel_on_completion(
@asynccontextmanager @asynccontextmanager
async def run_in_process(subactor, async_fn, *args, **kwargs): async def spawn_subactor(
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs)) subactor: Actor,
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int]
):
async with await trio.open_process( spawn_cmd = [
[
sys.executable, sys.executable,
"-m", "-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a _child.__name__,
# double import warning: https://stackoverflow.com/a/45070583 subactor.name,
"tractor._child", ";".join(
# This is merely an identifier for debugging purposes when [mod_path for mod_path in subactor.rpc_module_paths]
# viewing the process tree from the OS ),
str(subactor.uid), subactor.uid[1],
], subactor.loglevel or "None",
stdin=subprocess.PIPE, f"{bind_addr[0]}:{bind_addr[1]}",
) as proc: f"{parent_addr[0]}:{parent_addr[1]}",
f"{subactor._arb_addr[0]}:{subactor._arb_addr[1]}",
]
log.info(f"Spawn actor with cmd: {spawn_cmd}")
proc = await trio.open_process(spawn_cmd)
# send func object to call in child
await proc.stdin.send_all(encoded_job)
yield proc yield proc
@ -202,9 +206,7 @@ async def new_proc(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio': if use_trio_run_in_process or _spawn_method == 'trio':
async with run_in_process( async with spawn_subactor(
subactor,
_trio_main,
subactor, subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,