Compare commits

..

10 Commits

Author SHA1 Message Date
Guillermo Rodriguez 2b09818ed0
Merge pull request #1 from goodboy/drop-trip-update-trio
Drop trip update trio
2020-07-20 21:04:05 -03:00
Tyler Goodlet 409ceefd6e Add logging to some cancel tests 2020-07-20 19:51:07 -04:00
Tyler Goodlet 86ed8111d8 Make sure to wait trio processes on teardown 2020-07-20 19:50:47 -04:00
Tyler Goodlet 1459abe568 Change spawn method name in `Actor.load_modules()` 2020-07-20 19:50:19 -04:00
Tyler Goodlet 660f310737 Add back subactor logging 2020-07-20 19:49:39 -04:00
Guillermo Rodriguez 772f9c3ac3
First attempt at removing trip & updating hazmat -> lowlevel 2020-07-20 16:18:38 -03:00
Tyler Goodlet 6bf5148ffc Allow marking `asyncio` funcs declaring `to_trio` channel 2020-07-03 17:40:37 -04:00
Tyler Goodlet 6d5ebb9aa7 Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
2020-07-03 17:33:46 -04:00
Tyler Goodlet fcd1566834 Drop entrypoints from `Actor` 2020-07-03 17:05:38 -04:00
Tyler Goodlet d19c0f9b1f Move asyncio guest mode entrypoint to `to_asyncio`
The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
2020-07-01 13:38:40 -04:00
12 changed files with 170 additions and 163 deletions

View File

@ -25,16 +25,16 @@ matrix:
- name: "Python 3.7: multiprocessing" - name: "Python 3.7: multiprocessing"
python: 3.7 # this works for Linux but is ignored on macOS or Windows python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp" env: SPAWN_BACKEND="mp"
- name: "Python 3.7: trio-run-in-process" - name: "Python 3.7: trio"
python: 3.7 # this works for Linux but is ignored on macOS or Windows python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process" env: SPAWN_BACKEND="trio"
- name: "Python 3.8: multiprocessing" - name: "Python 3.8: multiprocessing"
python: 3.8 # this works for Linux but is ignored on macOS or Windows python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp" env: SPAWN_BACKEND="mp"
- name: "Python 3.8: trio-run-in-process" - name: "Python 3.8: trio"
python: 3.8 # this works for Linux but is ignored on macOS or Windows python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process" env: SPAWN_BACKEND="trio"
install: install:
- cd $TRAVIS_BUILD_DIR - cd $TRAVIS_BUILD_DIR

View File

@ -39,7 +39,7 @@ setup(
], ],
install_requires=[ install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
'trio_typing', 'trio-run-in-process', 'trio_typing', 'cloudpickle',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.7", python_requires=">=3.7",

View File

@ -21,7 +21,7 @@ def pytest_addoption(parser):
parser.addoption( parser.addoption(
"--spawn-backend", action="store", dest='spawn_backend', "--spawn-backend", action="store", dest='spawn_backend',
default='trio_run_in_process', default='trio',
help="Processing spawning backend to use for test run", help="Processing spawning backend to use for test run",
) )
@ -34,7 +34,7 @@ def pytest_configure(config):
if backend == 'mp': if backend == 'mp':
tractor._spawn.try_set_start_method('spawn') tractor._spawn.try_set_start_method('spawn')
elif backend == 'trio_run_in_process': elif backend == 'trio':
tractor._spawn.try_set_start_method(backend) tractor._spawn.try_set_start_method(backend)
@ -56,7 +56,7 @@ def pytest_generate_tests(metafunc):
if not spawn_backend: if not spawn_backend:
# XXX some weird windows bug with `pytest`? # XXX some weird windows bug with `pytest`?
spawn_backend = 'mp' spawn_backend = 'mp'
assert spawn_backend in ('mp', 'trio_run_in_process') assert spawn_backend in ('mp', 'trio')
if 'start_method' in metafunc.fixturenames: if 'start_method' in metafunc.fixturenames:
if spawn_backend == 'mp': if spawn_backend == 'mp':
@ -67,11 +67,11 @@ def pytest_generate_tests(metafunc):
# removing XXX: the fork method is in general # removing XXX: the fork method is in general
# incompatible with trio's global scheduler state # incompatible with trio's global scheduler state
methods.remove('fork') methods.remove('fork')
elif spawn_backend == 'trio_run_in_process': elif spawn_backend == 'trio':
if platform.system() == "Windows": if platform.system() == "Windows":
pytest.fail( pytest.fail(
"Only `--spawn-backend=mp` is supported on Windows") "Only `--spawn-backend=mp` is supported on Windows")
methods = ['trio_run_in_process'] methods = ['trio']
metafunc.parametrize("start_method", methods, scope='module') metafunc.parametrize("start_method", methods, scope='module')

View File

@ -197,7 +197,7 @@ async def test_cancel_infinite_streamer(start_method):
], ],
) )
@tractor_test @tractor_test
async def test_some_cancels_all(num_actors_and_errs, start_method): async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in """Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio. the nursery to be cancelled just like the strategy in trio.

View File

@ -179,6 +179,9 @@ class Actor:
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__( def __init__(
self, self,
name: str, name: str,
@ -256,7 +259,7 @@ class Actor:
code (if it exists). code (if it exists).
""" """
try: try:
if self._spawn_method == 'trio_run_in_process': if self._spawn_method == 'trio':
parent_data = self._parent_main_data parent_data = self._parent_main_data
if 'init_main_from_name' in parent_data: if 'init_main_from_name' in parent_data:
_mp_fixup_main._fixup_main_from_name( _mp_fixup_main._fixup_main_from_name(
@ -539,58 +542,6 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
def _mp_main(
self,
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
self._forkserver_info = forkserver_info
from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method)
if self.loglevel is not None:
log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}")
get_console_log(self.loglevel)
assert spawn_ctx
log.info(
f"Started new {spawn_ctx.current_process()} for {self.uid}")
_state._current_actor = self
log.debug(f"parent_addr is {parent_addr}")
try:
trio.run(partial(
self._async_main, accept_addr, parent_addr=parent_addr))
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {self.uid} terminated")
async def _trip_main(
self,
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
"""
if self.loglevel is not None:
log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}")
get_console_log(self.loglevel)
log.info(f"Started new TRIP process for {self.uid}")
_state._current_actor = self
await self._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {self.uid} terminated")
async def _async_main( async def _async_main(
self, self,
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
@ -846,6 +797,8 @@ class Actor:
log.info(f"Handshake with actor {uid}@{chan.raddr} complete") log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has

13
tractor/_child.py 100644
View File

@ -0,0 +1,13 @@
import sys
import trio
import cloudpickle
if __name__ == "__main__":
job = cloudpickle.load(sys.stdin.detach())
try:
result = trio.run(job)
cloudpickle.dump(sys.stdout.detach(), result)
except BaseException as err:
cloudpickle.dump(sys.stdout.detach(), err)

View File

@ -1,60 +1,20 @@
""" """
Process entry points. Process entry points.
""" """
import asyncio
from functools import partial from functools import partial
from typing import Tuple, Any, Awaitable from typing import Tuple, Any
import trio # type: ignore import trio # type: ignore
from ._actor import Actor from ._actor import Actor
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__) log = get_logger(__name__)
def _asyncio_main(
trio_main: Awaitable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
asyncio.run(aio_main(trio_main))
def _mp_main( def _mp_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
@ -89,7 +49,7 @@ def _mp_main(
try: try:
if infect_asyncio: if infect_asyncio:
actor._infected_aio = True actor._infected_aio = True
_asyncio_main(trio_main) run_as_asyncio_guest(trio_main)
else: else:
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
@ -97,7 +57,7 @@ def _mp_main(
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")
async def _trip_main( async def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int] = None
@ -112,7 +72,9 @@ async def _trip_main(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
log.info(f"Started new TRIP process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
_state._current_actor = actor _state._current_actor = actor
await actor._async_main(accept_addr, parent_addr=parent_addr) await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")

View File

@ -1,14 +1,18 @@
""" """
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
import sys
import inspect import inspect
import subprocess
import multiprocessing as mp import multiprocessing as mp
import platform import platform
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from functools import partial
import trio import trio
import cloudpickle
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import aclosing from async_generator import aclosing, asynccontextmanager
try: try:
from multiprocessing import semaphore_tracker # type: ignore from multiprocessing import semaphore_tracker # type: ignore
@ -21,12 +25,12 @@ except ImportError:
from multiprocessing import forkserver # type: ignore from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override, _child
from ._state import current_actor from ._state import current_actor
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
from ._entry import _mp_main, _trip_main from ._entry import _mp_main, _trio_main
log = get_logger('tractor') log = get_logger('tractor')
@ -41,14 +45,13 @@ if platform.system() == 'Windows':
_ctx = mp.get_context("spawn") _ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel) await trio.lowlevel.WaitForSingleObject(proc.sentinel)
else: else:
# *NIX systems use ``trio_run_in_process` as our default (for now) # *NIX systems use ``trio`` primitives as our default
import trio_run_in_process _spawn_method = "trio"
_spawn_method = "trio_run_in_process"
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel) await trio.lowlevel.wait_readable(proc.sentinel)
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
@ -57,7 +60,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
If the desired method is not supported this function will error. On If the desired method is not supported this function will error. On
Windows the only supported option is the ``multiprocessing`` "spawn" Windows the only supported option is the ``multiprocessing`` "spawn"
method. The default on *nix systems is ``trio_run_in_process``. method. The default on *nix systems is ``trio``.
""" """
global _ctx global _ctx
global _spawn_method global _spawn_method
@ -69,7 +72,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
# no Windows support for trip yet # no Windows support for trip yet
if platform.system() != 'Windows': if platform.system() != 'Windows':
methods += ['trio_run_in_process'] methods += ['trio']
if name not in methods: if name not in methods:
raise ValueError( raise ValueError(
@ -78,7 +81,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
elif name == 'forkserver': elif name == 'forkserver':
_forkserver_override.override_stdlib() _forkserver_override.override_stdlib()
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
elif name == 'trio_run_in_process': elif name == 'trio':
_ctx = None _ctx = None
else: else:
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
@ -153,6 +156,27 @@ async def cancel_on_completion(
await portal.cancel_actor() await portal.cancel_actor()
@asynccontextmanager
async def run_in_process(async_fn, *args, **kwargs):
encoded_job = cloudpickle.dumps(partial(async_fn, *args, **kwargs))
p = await trio.open_process(
[
sys.executable,
"-m",
_child.__name__
],
stdin=subprocess.PIPE
)
# send over func to call
await p.stdin.send_all(encoded_job)
yield p
# wait for termination
await p.wait()
async def new_proc( async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore actor_nursery: 'ActorNursery', # type: ignore
@ -174,12 +198,9 @@ async def new_proc(
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': if use_trio_run_in_process or _spawn_method == 'trio':
if infect_asyncio: async with run_in_process(
raise NotImplementedError("Asyncio is incompatible with trip") _trio_main,
# trio_run_in_process
async with trio_run_in_process.open_in_process(
_trip_main,
subactor, subactor,
bind_addr, bind_addr,
parent_addr, parent_addr,
@ -203,7 +224,7 @@ async def new_proc(
cancel_scope = await nursery.start( cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors) cancel_on_completion, portal, subactor, errors)
# TRIP blocks here until process is complete # run_in_process blocks here until process is complete
else: else:
# `multiprocessing` # `multiprocessing`
assert _ctx assert _ctx

View File

@ -30,7 +30,7 @@ class ActorContextInfo(Mapping):
def __getitem__(self, key: str): def __getitem__(self, key: str):
try: try:
return { return {
'task': trio.hazmat.current_task, 'task': trio.lowlevel.current_task,
'actor': current_actor 'actor': current_actor
}[key]().name }[key]().name
except RuntimeError: except RuntimeError:

View File

@ -41,9 +41,11 @@ def stream(func):
""" """
func._tractor_stream_function = True func._tractor_stream_function = True
sig = inspect.signature(func) sig = inspect.signature(func)
if 'ctx' not in sig.parameters: params = sig.parameters
if 'ctx' not in params and 'to_trio' not in params:
raise TypeError( raise TypeError(
"The first argument to the stream function " "The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`" f"{func.__name__} must be `ctx: tractor.Context` "
"(Or ``to_trio`` if using ``asyncio`` in guest mode)."
) )
return func return func

View File

@ -47,7 +47,7 @@ def tractor_test(fn):
if platform.system() == "Windows": if platform.system() == "Windows":
start_method = 'spawn' start_method = 'spawn'
else: else:
start_method = 'trio_run_in_process' start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters: if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends # set of subprocess spawning backends

View File

@ -4,7 +4,6 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
import asyncio import asyncio
import inspect import inspect
from typing import ( from typing import (
Any,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
Awaitable, Awaitable,
@ -13,49 +12,60 @@ from typing import (
import trio import trio
from .log import get_logger
from ._state import current_actor from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run']
__all__ = ['run_task', 'run_as_asyncio_guest']
async def _invoke( async def _invoke(
from_trio: trio.abc.ReceiveChannel, from_trio: trio.abc.ReceiveChannel,
to_trio: asyncio.Queue, to_trio: asyncio.Queue,
coro: Awaitable, coro: Awaitable,
) -> Union[AsyncGenerator, Awaitable]: ) -> None:
"""Await or stream awaiable object based on type into """Await or stream awaiable object based on ``coro`` type into
``trio`` memory channel. ``trio`` memory channel.
``from_trio`` might eventually be used here for bidirectional streaming.
""" """
async def stream_from_gen(c):
async for item in c:
to_trio.send_nowait(item)
async def just_return(c):
to_trio.send_nowait(await c)
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
return await stream_from_gen(coro) async for item in coro:
to_trio.send_nowait(item)
elif inspect.iscoroutine(coro): elif inspect.iscoroutine(coro):
return await coro to_trio.send_nowait(await coro)
async def run( async def run_task(
func: Callable, func: Callable,
*,
qsize: int = 2**10, qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs, **kwargs,
) -> Any: ) -> Union[AsyncGenerator, Awaitable]:
"""Run an ``asyncio`` async function or generator in a task, return """Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``. or stream the result back to ``trio``.
""" """
assert current_actor()._infected_aio assert current_actor().is_infected_aio()
# ITC (inter task comms) # ITC (inter task comms)
from_trio = asyncio.Queue(qsize) from_trio = asyncio.Queue(qsize)
to_trio, from_aio = trio.open_memory_channel(qsize) to_trio, from_aio = trio.open_memory_channel(qsize)
# allow target func to accept/stream results manually args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
_treat_as_stream = True
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = to_trio kwargs['from_trio'] = to_trio
coro = func(**kwargs) coro = func(**kwargs)
@ -67,7 +77,6 @@ async def run(
task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) task = asyncio.create_task(_invoke(from_trio, to_trio, coro))
err = None err = None
# XXX: I'm not sure this actually does anything...
def cancel_trio(task): def cancel_trio(task):
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
@ -77,17 +86,8 @@ async def run(
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
# determine return type async func vs. gen
if inspect.isasyncgen(coro):
# simple async func
async def result():
with cancel_scope:
return await from_aio.get()
if cancel_scope.cancelled_caught and err:
raise err
elif inspect.iscoroutine(coro):
# asycn gen # asycn gen
if inspect.isasyncgen(coro) or _treat_as_stream:
async def result(): async def result():
with cancel_scope: with cancel_scope:
async with from_aio: async with from_aio:
@ -97,3 +97,59 @@ async def run(
raise err raise err
return result() return result()
# simple async func
elif inspect.iscoroutine(coro):
with cancel_scope:
result = await from_aio.receive()
return result
if cancel_scope.cancelled_caught and err:
raise err
def run_as_asyncio_guest(
trio_main: Awaitable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))