First attempt at removing trip & updating hazmat -> lowlevel

drop_cloudpickle
Guillermo Rodriguez 2020-07-20 16:18:38 -03:00 committed by Tyler Goodlet
parent 7c73775474
commit 56463a08df
8 changed files with 64 additions and 42 deletions

View File

@ -25,16 +25,16 @@ matrix:
- name: "Python 3.7: multiprocessing" - name: "Python 3.7: multiprocessing"
python: 3.7 # this works for Linux but is ignored on macOS or Windows python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp" env: SPAWN_BACKEND="mp"
- name: "Python 3.7: trio-run-in-process" - name: "Python 3.7: trio"
python: 3.7 # this works for Linux but is ignored on macOS or Windows python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process" env: SPAWN_BACKEND="trio"
- name: "Python 3.8: multiprocessing" - name: "Python 3.8: multiprocessing"
python: 3.8 # this works for Linux but is ignored on macOS or Windows python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp" env: SPAWN_BACKEND="mp"
- name: "Python 3.8: trio-run-in-process" - name: "Python 3.8: trio"
python: 3.8 # this works for Linux but is ignored on macOS or Windows python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process" env: SPAWN_BACKEND="trio"
install: install:
- cd $TRAVIS_BUILD_DIR - cd $TRAVIS_BUILD_DIR

View File

@ -39,7 +39,7 @@ setup(
], ],
install_requires=[ install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
'trio_typing', 'trio-run-in-process', 'trio_typing', 'cloudpickle',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.7", python_requires=">=3.7",

View File

@ -21,7 +21,7 @@ def pytest_addoption(parser):
parser.addoption( parser.addoption(
"--spawn-backend", action="store", dest='spawn_backend', "--spawn-backend", action="store", dest='spawn_backend',
default='trio_run_in_process', default='trio',
help="Processing spawning backend to use for test run", help="Processing spawning backend to use for test run",
) )
@ -34,7 +34,7 @@ def pytest_configure(config):
if backend == 'mp': if backend == 'mp':
tractor._spawn.try_set_start_method('spawn') tractor._spawn.try_set_start_method('spawn')
elif backend == 'trio_run_in_process': elif backend == 'trio':
tractor._spawn.try_set_start_method(backend) tractor._spawn.try_set_start_method(backend)
@ -56,7 +56,7 @@ def pytest_generate_tests(metafunc):
if not spawn_backend: if not spawn_backend:
# XXX some weird windows bug with `pytest`? # XXX some weird windows bug with `pytest`?
spawn_backend = 'mp' spawn_backend = 'mp'
assert spawn_backend in ('mp', 'trio_run_in_process') assert spawn_backend in ('mp', 'trio')
if 'start_method' in metafunc.fixturenames: if 'start_method' in metafunc.fixturenames:
if spawn_backend == 'mp': if spawn_backend == 'mp':
@ -67,11 +67,11 @@ def pytest_generate_tests(metafunc):
# removing XXX: the fork method is in general # removing XXX: the fork method is in general
# incompatible with trio's global scheduler state # incompatible with trio's global scheduler state
methods.remove('fork') methods.remove('fork')
elif spawn_backend == 'trio_run_in_process': elif spawn_backend == 'trio':
if platform.system() == "Windows": if platform.system() == "Windows":
pytest.fail( pytest.fail(
"Only `--spawn-backend=mp` is supported on Windows") "Only `--spawn-backend=mp` is supported on Windows")
methods = ['trio_run_in_process'] methods = ['trio']
metafunc.parametrize("start_method", methods, scope='module') metafunc.parametrize("start_method", methods, scope='module')

13
tractor/_child.py 100644
View File

@ -0,0 +1,13 @@
import sys
import trio
import cloudpickle
if __name__ == "__main__":
job = cloudpickle.load(sys.stdin.detach())
try:
result = trio.run(job)
cloudpickle.dump(sys.stdout.detach(), result)
except BaseException as err:
cloudpickle.dump(sys.stdout.detach(), err)

View File

@ -52,22 +52,12 @@ def _mp_main(
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")
async def _trip_main( async def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int] = None
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
"""
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel)
log.info(f"Started new TRIP process for {actor.uid}")
_state._current_actor = actor _state._current_actor = actor
await actor._async_main(accept_addr, parent_addr=parent_addr) await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated")

View File

@ -1,14 +1,18 @@
""" """
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
import sys
import inspect import inspect
import subprocess
import multiprocessing as mp import multiprocessing as mp
import platform import platform
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from functools import partial
import trio import trio
import cloudpickle
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import aclosing from async_generator import aclosing, asynccontextmanager
try: try:
from multiprocessing import semaphore_tracker # type: ignore from multiprocessing import semaphore_tracker # type: ignore
@ -21,12 +25,12 @@ except ImportError:
from multiprocessing import forkserver # type: ignore from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override, _child
from ._state import current_actor from ._state import current_actor
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trip_main from ._entry import _mp_main, _trio_main
log = get_logger('tractor') log = get_logger('tractor')
@ -41,14 +45,13 @@ if platform.system() == 'Windows':
_ctx = mp.get_context("spawn") _ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel) await trio.lowlevel.WaitForSingleObject(proc.sentinel)
else: else:
# *NIX systems use ``trio_run_in_process` as our default (for now) # *NIX systems use ``trio`` primitives as our default
import trio_run_in_process _spawn_method = "trio"
_spawn_method = "trio_run_in_process"
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel) await trio.lowlevel.wait_readable(proc.sentinel)
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
@ -57,7 +60,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
If the desired method is not supported this function will error. On If the desired method is not supported this function will error. On
Windows the only supported option is the ``multiprocessing`` "spawn" Windows the only supported option is the ``multiprocessing`` "spawn"
method. The default on *nix systems is ``trio_run_in_process``. method. The default on *nix systems is ``trio``.
""" """
global _ctx global _ctx
global _spawn_method global _spawn_method
@ -69,7 +72,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
# no Windows support for trip yet # no Windows support for trip yet
if platform.system() != 'Windows': if platform.system() != 'Windows':
methods += ['trio_run_in_process'] methods += ['trio']
if name not in methods: if name not in methods:
raise ValueError( raise ValueError(
@ -78,7 +81,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
elif name == 'forkserver': elif name == 'forkserver':
_forkserver_override.override_stdlib() _forkserver_override.override_stdlib()
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
elif name == 'trio_run_in_process': elif name == 'trio':
_ctx = None _ctx = None
else: else:
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
@ -153,6 +156,25 @@ async def cancel_on_completion(
await portal.cancel_actor() await portal.cancel_actor()
@asynccontextmanager
async def run_in_process(async_fn, *args, **kwargs):
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs))
p = await trio.open_process(
[
sys.executable,
"-m",
_child.__name__
],
stdin=subprocess.PIPE
)
await p.stdin.send_all(encoded_job)
yield p
#return cloudpickle.loads(p.stdout)
async def new_proc( async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore actor_nursery: 'ActorNursery', # type: ignore
@ -174,12 +196,9 @@ async def new_proc(
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': if use_trio_run_in_process or _spawn_method == 'trio':
if infect_asyncio: async with run_in_process(
raise NotImplementedError("Asyncio is incompatible with trip") _trio_main,
# trio_run_in_process
async with trio_run_in_process.open_in_process(
_trip_main,
subactor, subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,
@ -203,7 +222,7 @@ async def new_proc(
cancel_scope = await nursery.start( cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors) cancel_on_completion, portal, subactor, errors)
# TRIP blocks here until process is complete # run_in_process blocks here until process is complete
else: else:
# `multiprocessing` # `multiprocessing`
assert _ctx assert _ctx

View File

@ -30,7 +30,7 @@ class ActorContextInfo(Mapping):
def __getitem__(self, key: str): def __getitem__(self, key: str):
try: try:
return { return {
'task': trio.hazmat.current_task, 'task': trio.lowlevel.current_task,
'actor': current_actor 'actor': current_actor
}[key]().name }[key]().name
except RuntimeError: except RuntimeError:

View File

@ -47,7 +47,7 @@ def tractor_test(fn):
if platform.system() == "Windows": if platform.system() == "Windows":
start_method = 'spawn' start_method = 'spawn'
else: else:
start_method = 'trio_run_in_process' start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters: if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends # set of subprocess spawning backends