From 56463a08df310160c7c6eb74c439474c39929f86 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 20 Jul 2020 16:18:38 -0300 Subject: [PATCH] First attempt at removing trip & updating hazmat -> lowlevel --- .travis.yml | 8 ++--- setup.py | 2 +- tests/conftest.py | 10 +++--- tractor/_child.py | 13 ++++++++ tractor/_entry.py | 14 ++------ tractor/_spawn.py | 55 +++++++++++++++++++++----------- tractor/_state.py | 2 +- tractor/testing/_tractor_test.py | 2 +- 8 files changed, 64 insertions(+), 42 deletions(-) create mode 100644 tractor/_child.py diff --git a/.travis.yml b/.travis.yml index 6e57aed..d709871 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,16 +25,16 @@ matrix: - name: "Python 3.7: multiprocessing" python: 3.7 # this works for Linux but is ignored on macOS or Windows env: SPAWN_BACKEND="mp" - - name: "Python 3.7: trio-run-in-process" + - name: "Python 3.7: trio" python: 3.7 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND="trio_run_in_process" + env: SPAWN_BACKEND="trio" - name: "Python 3.8: multiprocessing" python: 3.8 # this works for Linux but is ignored on macOS or Windows env: SPAWN_BACKEND="mp" - - name: "Python 3.8: trio-run-in-process" + - name: "Python 3.8: trio" python: 3.8 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND="trio_run_in_process" + env: SPAWN_BACKEND="trio" install: - cd $TRAVIS_BUILD_DIR diff --git a/setup.py b/setup.py index 3fe45dc..7f811cd 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( ], install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', - 'trio_typing', 'trio-run-in-process', + 'trio_typing', 'cloudpickle', ], tests_require=['pytest'], python_requires=">=3.7", diff --git a/tests/conftest.py b/tests/conftest.py index 128ff3e..64a072a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ def pytest_addoption(parser): parser.addoption( "--spawn-backend", action="store", dest='spawn_backend', - default='trio_run_in_process', + default='trio', help="Processing spawning backend to use for test run", ) @@ -34,7 +34,7 @@ def pytest_configure(config): if backend == 'mp': tractor._spawn.try_set_start_method('spawn') - elif backend == 'trio_run_in_process': + elif backend == 'trio': tractor._spawn.try_set_start_method(backend) @@ -56,7 +56,7 @@ def pytest_generate_tests(metafunc): if not spawn_backend: # XXX some weird windows bug with `pytest`? spawn_backend = 'mp' - assert spawn_backend in ('mp', 'trio_run_in_process') + assert spawn_backend in ('mp', 'trio') if 'start_method' in metafunc.fixturenames: if spawn_backend == 'mp': @@ -67,11 +67,11 @@ def pytest_generate_tests(metafunc): # removing XXX: the fork method is in general # incompatible with trio's global scheduler state methods.remove('fork') - elif spawn_backend == 'trio_run_in_process': + elif spawn_backend == 'trio': if platform.system() == "Windows": pytest.fail( "Only `--spawn-backend=mp` is supported on Windows") - methods = ['trio_run_in_process'] + methods = ['trio'] metafunc.parametrize("start_method", methods, scope='module') diff --git a/tractor/_child.py b/tractor/_child.py new file mode 100644 index 0000000..adad3c5 --- /dev/null +++ b/tractor/_child.py @@ -0,0 +1,13 @@ +import sys +import trio +import cloudpickle + +if __name__ == "__main__": + job = cloudpickle.load(sys.stdin.detach()) + + try: + result = trio.run(job) + cloudpickle.dump(sys.stdout.detach(), result) + + except BaseException as err: + cloudpickle.dump(sys.stdout.detach(), err) \ No newline at end of file diff --git a/tractor/_entry.py b/tractor/_entry.py index a6a33db..62f7a1f 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -52,22 +52,12 @@ def _mp_main( log.info(f"Actor {actor.uid} terminated") -async def _trip_main( +async def _trio_main( actor: 'Actor', accept_addr: Tuple[str, int], parent_addr: Tuple[str, int] = None ) -> None: - """Entry point for a `trio_run_in_process` subactor. - Here we don't need to call `trio.run()` since trip does that as - part of its subprocess startup sequence. - """ - if actor.loglevel is not None: - log.info( - f"Setting loglevel for {actor.uid} to {actor.loglevel}") - get_console_log(actor.loglevel) - - log.info(f"Started new TRIP process for {actor.uid}") _state._current_actor = actor + await actor._async_main(accept_addr, parent_addr=parent_addr) - log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 09c36e5..4ae2ce4 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,14 +1,18 @@ """ Machinery for actor process spawning using multiple backends. """ +import sys import inspect +import subprocess import multiprocessing as mp import platform from typing import Any, Dict, Optional +from functools import partial import trio +import cloudpickle from trio_typing import TaskStatus -from async_generator import aclosing +from async_generator import aclosing, asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -21,12 +25,12 @@ except ImportError: from multiprocessing import forkserver # type: ignore from typing import Tuple -from . import _forkserver_override +from . import _forkserver_override, _child from ._state import current_actor from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure -from ._entry import _mp_main, _trip_main +from ._entry import _mp_main, _trio_main log = get_logger('tractor') @@ -41,14 +45,13 @@ if platform.system() == 'Windows': _ctx = mp.get_context("spawn") async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.WaitForSingleObject(proc.sentinel) + await trio.lowlevel.WaitForSingleObject(proc.sentinel) else: - # *NIX systems use ``trio_run_in_process` as our default (for now) - import trio_run_in_process - _spawn_method = "trio_run_in_process" + # *NIX systems use ``trio`` primitives as our default + _spawn_method = "trio" async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.wait_readable(proc.sentinel) + await trio.lowlevel.wait_readable(proc.sentinel) def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: @@ -57,7 +60,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: If the desired method is not supported this function will error. On Windows the only supported option is the ``multiprocessing`` "spawn" - method. The default on *nix systems is ``trio_run_in_process``. + method. The default on *nix systems is ``trio``. """ global _ctx global _spawn_method @@ -69,7 +72,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: # no Windows support for trip yet if platform.system() != 'Windows': - methods += ['trio_run_in_process'] + methods += ['trio'] if name not in methods: raise ValueError( @@ -78,7 +81,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: elif name == 'forkserver': _forkserver_override.override_stdlib() _ctx = mp.get_context(name) - elif name == 'trio_run_in_process': + elif name == 'trio': _ctx = None else: _ctx = mp.get_context(name) @@ -153,6 +156,25 @@ async def cancel_on_completion( await portal.cancel_actor() +@asynccontextmanager +async def run_in_process(async_fn, *args, **kwargs): + encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs)) + p = await trio.open_process( + [ + sys.executable, + "-m", + _child.__name__ + ], + stdin=subprocess.PIPE + ) + + await p.stdin.send_all(encoded_job) + + yield p + + #return cloudpickle.loads(p.stdout) + + async def new_proc( name: str, actor_nursery: 'ActorNursery', # type: ignore @@ -174,12 +196,9 @@ async def new_proc( subactor._spawn_method = _spawn_method async with trio.open_nursery() as nursery: - if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': - if infect_asyncio: - raise NotImplementedError("Asyncio is incompatible with trip") - # trio_run_in_process - async with trio_run_in_process.open_in_process( - _trip_main, + if use_trio_run_in_process or _spawn_method == 'trio': + async with run_in_process( + _trio_main, subactor, bind_addr, parent_addr, @@ -203,7 +222,7 @@ async def new_proc( cancel_scope = await nursery.start( cancel_on_completion, portal, subactor, errors) - # TRIP blocks here until process is complete + # run_in_process blocks here until process is complete else: # `multiprocessing` assert _ctx diff --git a/tractor/_state.py b/tractor/_state.py index ea0d547..d624fc9 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -30,7 +30,7 @@ class ActorContextInfo(Mapping): def __getitem__(self, key: str): try: return { - 'task': trio.hazmat.current_task, + 'task': trio.lowlevel.current_task, 'actor': current_actor }[key]().name except RuntimeError: diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 5ca82a6..3db56e5 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -47,7 +47,7 @@ def tractor_test(fn): if platform.system() == "Windows": start_method = 'spawn' else: - start_method = 'trio_run_in_process' + start_method = 'trio' if 'start_method' in inspect.signature(fn).parameters: # set of subprocess spawning backends