forked from goodboy/tractor
1
0
Fork 0

Merge pull request #133 from guilledk/drop_cloudpickle

Drop cloudpickle dependency
matrix
goodboy 2020-07-29 18:24:27 -04:00 committed by GitHub
commit a399bd3033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 170 additions and 77 deletions

View File

@ -39,7 +39,7 @@ setup(
],
install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
'trio_typing', 'cloudpickle',
'trio_typing',
],
tests_require=['pytest'],
python_requires=">=3.7",

View File

@ -4,6 +4,7 @@ Cancellation and error propagation
import os
import signal
import platform
import time
from itertools import repeat
import pytest
@ -359,7 +360,11 @@ async def test_nested_multierrors(loglevel, start_method):
@no_windows
def test_cancel_via_SIGINT(loglevel, start_method):
def test_cancel_via_SIGINT(
loglevel,
start_method,
spawn_backend,
):
"""Ensure that a control-C (SIGINT) signal cancels both the parent and
child processes in trionic fashion
"""
@ -369,6 +374,8 @@ def test_cancel_via_SIGINT(loglevel, start_method):
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
await tn.start_actor('sucka')
if spawn_backend == 'mp':
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
await trio.sleep_forever()
@ -379,7 +386,8 @@ def test_cancel_via_SIGINT(loglevel, start_method):
@no_windows
def test_cancel_via_SIGINT_other_task(
loglevel,
start_method
start_method,
spawn_backend,
):
"""Ensure that a control-C (SIGINT) signal cancels both the parent
and child processes in trionic fashion even a subprocess is started
@ -399,6 +407,8 @@ def test_cancel_via_SIGINT_other_task(
with trio.fail_after(2):
async with trio.open_nursery() as n:
await n.start(spawn_and_sleep_forever)
if spawn_backend == 'mp':
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
with pytest.raises(KeyboardInterrupt):

View File

@ -202,7 +202,11 @@ async def cancel_after(wait):
@pytest.fixture(scope='module')
def time_quad_ex(arb_addr):
def time_quad_ex(arb_addr, travis, spawn_backend):
if travis and spawn_backend == 'mp' and (platform.system() != 'Windows'):
# no idea, but the travis, mp, linux runs are flaking out here often
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() == 'Windows' else 4
start = time.time()
results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr)
@ -213,9 +217,6 @@ def time_quad_ex(arb_addr):
def test_a_quadruple_example(time_quad_ex, travis, spawn_backend):
"""This also serves as a kind of "we'd like to be this fast test"."""
if travis and spawn_backend == 'mp' and (platform.system() != 'Windows'):
# no idea, but the travis, mp, linux runs are flaking out here often
pytest.skip("Test is too flaky on mp in CI")
results, diff = time_quad_ex
assert results
@ -233,10 +234,6 @@ def test_not_fast_enough_quad(
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
if travis and spawn_backend == 'mp' and (platform.system() != 'Windows'):
# no idea, but the travis, mp, linux runs are flaking out here often
pytest.skip("Test is too flaky on mp in CI")
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)

View File

@ -171,11 +171,6 @@ class Actor:
_root_nursery: trio.Nursery
_server_nursery: trio.Nursery
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
_spawn_method: Optional[str] = None
# Information about `__main__` from parent
_parent_main_data: Dict[str, str]
@ -187,6 +182,7 @@ class Actor:
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
phase (aka before a new process is executed).
@ -212,6 +208,11 @@ class Actor:
self.loglevel = loglevel
self._arb_addr = arbiter_addr
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
self._spawn_method = spawn_method
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict = {}
self._no_more_peers = trio.Event()
@ -541,8 +542,14 @@ class Actor:
async def _async_main(
self,
accept_addr: Tuple[str, int],
arbiter_addr: Optional[Tuple[str, int]] = None,
accept_addr: Optional[Tuple[str, int]] = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
# the child instead of relaying it over the connect-back
# channel). Once that backend is removed we can likely just
# change this so a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: Optional[Tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -552,29 +559,39 @@ class Actor:
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False
try:
async with trio.open_nursery() as nursery:
self._root_nursery = nursery
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
# TODO: just make `parent_addr` a bool system (see above)?
if parent_addr is not None:
try:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error, ship the
# exception back to the parent actor)
# handshake. From this point on if we error, we
# attempt to ship the exception back to the parent.
chan = self._parent_chan = Channel(
destaddr=parent_addr,
)
await chan.connect()
# initial handshake, report who we are, who they are
# Initial handshake: swap names.
await self._do_handshake(chan)
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data = await chan.recv()
log.debug(
"Recieved state from parent:\n"
f"{parent_data}"
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
for attr, value in parent_data.items():
setattr(self, attr, value)
except OSError: # failed to connect
log.warning(
f"Failed to connect to parent @ {parent_addr},"
@ -582,21 +599,39 @@ class Actor:
await self.cancel()
self._parent_chan = None
raise
else:
# handle new connection back to parent
assert self._parent_chan
nursery.start_soon(
self._process_messages, self._parent_chan)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing connection to parent
# so that import errors are properly propagated upwards
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
self.load_modules()
# register with the arbiter if we're told its addr
# Startup up channel server with,
# - subactor: the bind address sent to us by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
await nursery.start(
partial(
self._serve_forever,
accept_host=host,
accept_port=port
)
)
# Begin handling our new connection back to parent.
# This is done here since we don't want to start
# processing parent requests until our server is
# 100% up and running.
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
# Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
assert isinstance(arbiter_addr, tuple)
async with get_arbiter(*arbiter_addr) as arb_portal:
assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
uid=self.uid, sockaddr=self.accept_addr)
@ -605,7 +640,7 @@ class Actor:
task_status.started()
log.debug("Waiting on root nursery to complete")
# blocks here as expected until the channel server is
# Blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
if not registered_with_arbiter:
@ -614,7 +649,7 @@ class Actor:
# once we have that all working with std streams locking?
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}?")
f"@ {self._arb_addr}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
@ -643,7 +678,7 @@ class Actor:
finally:
if registered_with_arbiter:
await self._do_unreg(arbiter_addr)
await self._do_unreg(self._arb_addr)
# terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared
if not self._no_more_peers.is_set():
@ -894,8 +929,7 @@ async def _start_actor(
partial(
actor._async_main,
accept_addr=(host, port),
parent_addr=None,
arbiter_addr=arbiter_addr,
parent_addr=None
)
)
result = await main()

View File

@ -1,6 +1,40 @@
"""This is the "bootloader" for actors started using the native trio backend.
"""
import sys
import trio
import cloudpickle
import argparse
from ast import literal_eval
from ._actor import Actor
from ._entry import _trio_main
def parse_uid(arg):
name, uuid = literal_eval(arg) # ensure 2 elements
return str(name), str(uuid) # ensures str encoding
def parse_ipaddr(arg):
host, port = literal_eval(arg)
return (str(host), int(port))
if __name__ == "__main__":
trio.run(cloudpickle.load(sys.stdin.buffer))
parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr)
args = parser.parse_args()
subactor = Actor(
args.uid[0],
uid=args.uid[1],
loglevel=args.loglevel,
spawn_method="trio"
)
_trio_main(
subactor,
parent_addr=args.parent_addr
)

View File

@ -51,24 +51,29 @@ def _mp_main(
log.info(f"Actor {actor.uid} terminated")
async def _trio_main(
def _trio_main(
actor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
"""
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel)
log.info(f"Started new trio process for {actor.uid}")
log.info(
f"Started {actor.uid}")
_state._current_actor = actor
await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated")
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
parent_addr=parent_addr
)
try:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {actor.uid} terminated")

View File

@ -3,14 +3,11 @@ Machinery for actor process spawning using multiple backends.
"""
import sys
import inspect
import subprocess
import multiprocessing as mp
import platform
from typing import Any, Dict, Optional
from functools import partial
import trio
import cloudpickle
from trio_typing import TaskStatus
from async_generator import aclosing, asynccontextmanager
@ -30,7 +27,7 @@ from ._state import current_actor
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trio_main
from ._entry import _mp_main
log = get_logger('tractor')
@ -158,25 +155,31 @@ async def cancel_on_completion(
@asynccontextmanager
async def run_in_process(subactor, async_fn, *args, **kwargs):
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs))
async def spawn_subactor(
subactor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
):
async with await trio.open_process(
[
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# This is merely an identifier for debugging purposes when
# viewing the process tree from the OS
str(subactor.uid),
],
stdin=subprocess.PIPE,
) as proc:
spawn_cmd = [
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
"--uid",
str(subactor.uid),
"--parent_addr",
str(parent_addr)
]
# send func object to call in child
await proc.stdin.send_all(encoded_job)
if subactor.loglevel:
spawn_cmd += [
"--loglevel",
subactor.loglevel
]
async with await trio.open_process(spawn_cmd) as proc:
yield proc
@ -201,9 +204,7 @@ async def new_proc(
async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio':
async with run_in_process(
subactor,
_trio_main,
async with spawn_subactor(
subactor,
bind_addr,
parent_addr,
@ -218,6 +219,18 @@ async def new_proc(
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths,
"statespace": subactor.statespace,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1]
})
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called