forked from goodboy/tractor
1
0
Fork 0

Merge pull request #61 from tgoodlet/spawn_method_support

Spawn method support
docs_example_fixes
goodboy 2019-03-08 20:11:40 -05:00 committed by GitHub
commit c0276c85df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 215 additions and 80 deletions

View File

@ -1,6 +1,6 @@
tractor tractor
======= =======
An async-native `actor model`_ built on trio_ and multiprocessing_. An async-native "`actor model`_" built on trio_ and multiprocessing_.
|travis| |travis|
@ -23,23 +23,26 @@ An async-native `actor model`_ built on trio_ and multiprocessing_.
``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to distributed multi-core Python. ``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to distributed multi-core Python.
``tractor`` lets you run and spawn *actors*: processes which each run a ``trio`` ``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run a ``trio`` scheduler and task
scheduler and task tree (also known as an `async sandwich`_). tree (also known as an `async sandwich`_). *Actors* communicate by exchanging asynchronous messages_ over
*Actors* communicate by sending messages_ over channels_ and avoid sharing any local state. channels_ and avoid sharing any state. This model allows for highly distributed software architecture
This `actor model`_ allows for highly distributed software architecture which works just as which works just as well on multiple cores as it does over many hosts.
well on multiple cores as it does over many hosts.
``tractor`` takes much inspiration from pulsar_ and execnet_ but attempts to be much more
focussed on sophistication of the lower level distributed architecture as well as have first
class support for `modern async Python`_.
The first step to grok ``tractor`` is to get the basics of ``trio`` ``tractor`` is an actor-model-*like* system in the sense that it adheres to the `3 axioms`_ but not does
down. A great place to start is the `trio docs`_ and this `blog post`_. not (yet) fufill all "unrequirements_" in practice. The API and design takes inspiration from pulsar_ and
execnet_ but attempts to be more focussed on sophistication of the lower level distributed architecture as
well as have first class support for streaming using `async generators`_.
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
A great place to start is the `trio docs`_ and this `blog post`_.
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/ .. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _modern async Python: https://www.python.org/dev/peps/pep-0525/ .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/
.. contents:: .. contents::
@ -47,20 +50,25 @@ down. A great place to start is the `trio docs`_ and this `blog post`_.
Philosophy Philosophy
---------- ----------
``tractor``'s tenets non-comprehensively include: ``tractor`` aims to be the Python multi-processing framework *you always wanted*.
Its tenets non-comprehensively include:
- strict adherence to the `concept-in-progress`_ of *structured concurrency*
- no spawning of processes *willy-nilly*; causality_ is paramount! - no spawning of processes *willy-nilly*; causality_ is paramount!
- `shared nothing architecture`_ - (remote) errors `always propagate`_ back to the parent / caller
- remote errors `always propagate`_ back to the caller
- verbatim support for ``trio``'s cancellation_ system - verbatim support for ``trio``'s cancellation_ system
- `shared nothing architecture`_
- no use of *proxy* objects to wrap RPC calls - no use of *proxy* objects to wrap RPC calls
- an immersive debugging experience - an immersive debugging experience
- anti-fragility through `chaos engineering`_ - anti-fragility through `chaos engineering`_
.. warning:: ``tractor`` is in alpha-alpha and is expected to change rapidly! .. warning:: ``tractor`` is in alpha-alpha and is expected to change rapidly!
Expect nothing to be set in stone. Your ideas about where it should go Expect nothing to be set in stone. Your ideas about where it should go
are greatly appreciated! are greatly appreciated!
.. _concept-in-progress: https://trio.discourse.group/t/structured-concurrency-kickoff/55
.. _pulsar: http://quantmind.github.io/pulsar/design.html .. _pulsar: http://quantmind.github.io/pulsar/design.html
.. _execnet: https://codespeak.net/execnet/ .. _execnet: https://codespeak.net/execnet/
@ -74,6 +82,16 @@ No PyPi release yet!
pip install git+git://github.com/tgoodlet/tractor.git pip install git+git://github.com/tgoodlet/tractor.git
Windows "gotchas"
*****************
`tractor` internally uses the stdlib's `multiprocessing` package which
*can* have some gotchas on Windows. Namely, the need for calling
`freeze_support()`_ inside the ``__main__`` context. See `#61`_ for the
deats.
.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support
.. _#61: https://github.com/tgoodlet/tractor/pull/61#issuecomment-470053512
Examples Examples
-------- --------
@ -450,7 +468,6 @@ as ``multiprocessing`` calls it) which is running ``main()``.
https://trio.readthedocs.io/en/latest/reference-core.html#getting-back-into-the-trio-thread-from-another-thread https://trio.readthedocs.io/en/latest/reference-core.html#getting-back-into-the-trio-thread-from-another-thread
.. _asynchronous generators: https://www.python.org/dev/peps/pep-0525/ .. _asynchronous generators: https://www.python.org/dev/peps/pep-0525/
.. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i
.. _asyncitertools: https://github.com/vodik/asyncitertools
Cancellation Cancellation
@ -630,6 +647,9 @@ multiple tasks streaming responses concurrently:
The context notion comes from the context_ in nanomsg_. The context notion comes from the context_ in nanomsg_.
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack
Running actors standalone Running actors standalone
************************* *************************
@ -645,6 +665,16 @@ need to hop into a debugger. You just need to pass the existing
tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) tractor.run(main, arbiter_addr=('192.168.0.10', 1616))
Choosing a ``multiprocessing`` *start method*
*********************************************
``tractor`` supports selection of the `multiprocessing start method`_ via
a ``start_method`` kwarg to ``tractor.run()``. Note that on Windows
*spawn* it the only supported method and on nix systems *forkserver* is
selected by default for speed.
.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
Enabling logging Enabling logging
**************** ****************
Considering how complicated distributed software can become it helps to know Considering how complicated distributed software can become it helps to know
@ -673,6 +703,15 @@ Stuff I'd like to see ``tractor`` do real soon:
but with better `pdb++`_ support but with better `pdb++`_ support
- an extensive `chaos engineering`_ test suite - an extensive `chaos engineering`_ test suite
- support for reactive programming primitives and native support for asyncitertools_ like libs - support for reactive programming primitives and native support for asyncitertools_ like libs
- introduction of a `capability-based security`_ model
.. _supervisors: https://github.com/tgoodlet/tractor/issues/22
.. _nanomsg: https://nanomsg.github.io/nng/index.html
.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol
.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html
.. _asyncitertools: https://github.com/vodik/asyncitertools
.. _pdb++: https://github.com/antocuni/pdb
.. _capability-based security: https://en.wikipedia.org/wiki/Capability-based_security
Feel like saying hi? Feel like saying hi?
@ -682,12 +721,4 @@ This project is very much coupled to the ongoing development of
community). If you want to help, have suggestions or just want to community). If you want to help, have suggestions or just want to
say hi, please feel free to ping me on the `trio gitter channel`_! say hi, please feel free to ping me on the `trio gitter channel`_!
.. _supervisors: https://github.com/tgoodlet/tractor/issues/22
.. _nanomsg: https://nanomsg.github.io/nng/index.html
.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5
.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html
.. _pdb++: https://github.com/antocuni/pdb
.. _msgpack: https://en.wikipedia.org/wiki/MessagePack

View File

@ -28,3 +28,11 @@ def loglevel(request):
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def arb_addr(): def arb_addr():
return _arb_addr return _arb_addr
def pytest_generate_tests(metafunc):
if 'start_method' in metafunc.fixturenames:
from multiprocessing import get_all_start_methods
methods = get_all_start_methods()
methods.remove('fork')
metafunc.parametrize("start_method", methods, scope='module')

View File

@ -110,7 +110,7 @@ async def stream_forever():
@tractor_test @tractor_test
async def test_cancel_infinite_streamer(): async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds # stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope: with trio.move_on_after(1) as cancel_scope:
@ -139,7 +139,7 @@ async def test_cancel_infinite_streamer():
ids=['one_actor', 'two_actors'], ids=['one_actor', 'two_actors'],
) )
@tractor_test @tractor_test
async def test_some_cancels_all(num_actors_and_errs): async def test_some_cancels_all(num_actors_and_errs, start_method):
"""Verify a subset of failed subactors causes all others in """Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio. the nursery to be cancelled just like the strategy in trio.

View File

@ -57,7 +57,7 @@ def movie_theatre_question():
@tractor_test @tractor_test
async def test_movie_theatre_convo(): async def test_movie_theatre_convo(start_method):
"""The main ``tractor`` routine. """The main ``tractor`` routine.
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
@ -83,7 +83,7 @@ def cellar_door():
@tractor_test @tractor_test
async def test_most_beautiful_word(): async def test_most_beautiful_word(start_method):
"""The main ``tractor`` routine. """The main ``tractor`` routine.
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:

View File

@ -62,10 +62,14 @@ async def stream_from_single_subactor():
# await nursery.cancel() # await nursery.cancel()
def test_stream_from_single_subactor(arb_addr): def test_stream_from_single_subactor(arb_addr, start_method):
"""Verify streaming from a spawned async generator. """Verify streaming from a spawned async generator.
""" """
tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr) tractor.run(
stream_from_single_subactor,
arbiter_addr=arb_addr,
start_method=start_method,
)
# this is the first 2 actors, streamer_1 and streamer_2 # this is the first 2 actors, streamer_1 and streamer_2

View File

@ -11,7 +11,7 @@ import trio # type: ignore
from trio import MultiError from trio import MultiError
from .log import get_console_log, get_logger, get_loglevel from .log import get_console_log, get_logger, get_loglevel
from ._ipc import _connect_chan, Channel, Context from ._ipc import _connect_chan, Channel
from ._actor import ( from ._actor import (
Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor
) )
@ -19,6 +19,7 @@ from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
from . import msg from . import msg
from . import _spawn
__all__ = [ __all__ = [
@ -92,12 +93,16 @@ def run(
name: str = None, name: str = None,
arbiter_addr: Tuple[str, int] = ( arbiter_addr: Tuple[str, int] = (
_default_arbiter_host, _default_arbiter_port), _default_arbiter_host, _default_arbiter_port),
# the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
start_method: str = 'forkserver',
**kwargs: typing.Dict[str, typing.Any], **kwargs: typing.Dict[str, typing.Any],
) -> Any: ) -> Any:
"""Run a trio-actor async function in process. """Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor. This is tractor's main entry and the start point for any async actor.
""" """
_spawn.try_set_start_method(start_method)
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)

View File

@ -391,7 +391,8 @@ class Actor:
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
break break
log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore log.trace( # type: ignore
f"Received msg {msg} from {chan.uid}")
if msg.get('cid'): if msg.get('cid'):
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan, msg) await self._push_result(chan, msg)
@ -478,18 +479,20 @@ class Actor:
self, self,
accept_addr: Tuple[str, int], accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any], forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int] = None
) -> None: ) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run`` """The routine called *after fork* which invokes a fresh ``trio.run``
""" """
self._forkserver_info = forkserver_info self._forkserver_info = forkserver_info
from ._trionics import ctx from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method)
if self.loglevel is not None: if self.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}") f"Setting loglevel for {self.uid} to {self.loglevel}")
get_console_log(self.loglevel) get_console_log(self.loglevel)
log.info( log.info(
f"Started new {ctx.current_process()} for {self.uid}") f"Started new {spawn_ctx.current_process()} for {self.uid}")
_state._current_actor = self _state._current_actor = self
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
try: try:

98
tractor/_spawn.py 100644
View File

@ -0,0 +1,98 @@
"""
Process spawning.
Mostly just wrapping around ``multiprocessing``.
"""
import multiprocessing as mp
from multiprocessing import forkserver, semaphore_tracker # type: ignore
from typing import Tuple, Optional
from . import _forkserver_hackzorz
from ._state import current_actor
from ._actor import Actor
_ctx: mp.context.BaseContext = mp.get_context("spawn")
def try_set_start_method(name: str) -> mp.context.BaseContext:
"""Attempt to set the start method for ``multiprocess.Process`` spawning.
If the desired method is not supported the sub-interpreter (aka "spawn"
method) is used.
"""
global _ctx
allowed = mp.get_all_start_methods()
assert name in allowed
if name not in allowed:
name == 'spawn'
elif name == 'fork':
raise ValueError(
"`fork` is unsupported due to incompatibility with `trio`"
)
elif name == 'forkserver':
_forkserver_hackzorz.override_stdlib()
_ctx = mp.get_context(name)
return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
def new_proc(
name: str,
actor: Actor,
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
) -> mp.Process:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver only once
# and pass its ipc info to downstream children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(semaphore_tracker._semaphore_tracker, '_pid', None),
semaphore_tracker._semaphore_tracker._fd,
)
else:
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
semaphore_tracker._semaphore_tracker._pid,
semaphore_tracker._semaphore_tracker._fd,
) = curr_actor._forkserver_info
else:
fs_info = (None, None, None, None, None)
return _ctx.Process(
target=actor._fork_main,
args=(
bind_addr,
fs_info,
start_method,
parent_addr
),
# daemon=True,
name=name,
)

View File

@ -1,7 +1,6 @@
""" """
Per process state Per process state
""" """
import multiprocessing as mp
from typing import Optional from typing import Optional
@ -14,9 +13,3 @@ def current_actor() -> 'Actor': # type: ignore
if not _current_actor: if not _current_actor:
raise RuntimeError("No actor instance has been defined yet?") raise RuntimeError("No actor instance has been defined yet?")
return _current_actor return _current_actor
def is_main_process():
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'

View File

@ -1,24 +1,30 @@
""" """
``trio`` inspired apis and helpers ``trio`` inspired apis and helpers
""" """
import multiprocessing as mp
import inspect import inspect
from multiprocessing import forkserver, semaphore_tracker # type: ignore import platform
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any from typing import Tuple, List, Dict, Optional, Any
import typing import typing
import trio import trio
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
from . import _forkserver_hackzorz
from ._state import current_actor from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
from ._portal import Portal from ._portal import Portal
from . import _spawn
if platform.system() == 'Windows':
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
else:
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
_forkserver_hackzorz.override_stdlib()
ctx = mp.get_context("forkserver")
log = get_logger('tractor') log = get_logger('tractor')
@ -36,7 +42,6 @@ class ActorNursery:
# cancelled when their "main" result arrives # cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False self.cancelled: bool = False
self._forkserver: forkserver.ForkServer = None
async def __aenter__(self): async def __aenter__(self):
return self return self
@ -60,36 +65,11 @@ class ActorNursery:
) )
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
assert parent_addr assert parent_addr
self._forkserver = fs = forkserver._forkserver proc = _spawn.new_proc(
if mp.current_process().name == 'MainProcess' and ( name,
not self._actor._forkserver_info actor,
): bind_addr,
# if we're the "main" process start the forkserver only once parent_addr,
# and pass its ipc info to downstream children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(semaphore_tracker._semaphore_tracker, '_pid', None),
semaphore_tracker._semaphore_tracker._fd,
)
else:
assert self._actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
semaphore_tracker._semaphore_tracker._pid,
semaphore_tracker._semaphore_tracker._fd,
) = self._actor._forkserver_info
proc = ctx.Process(
target=actor._fork_main,
args=(bind_addr, fs_info, parent_addr),
# daemon=True,
name=name,
) )
# register the process before start in case we get a cancel # register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait # request before the actor has fully spawned - then we can wait
@ -208,7 +188,8 @@ class ActorNursery:
) -> None: ) -> None:
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel) await proc_waiter(proc)
# please god don't hang # please god don't hang
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")

View File

@ -24,7 +24,13 @@ def tractor_test(fn):
injected to tests declaring these funcargs. injected to tests declaring these funcargs.
""" """
@wraps(fn) @wraps(fn)
def wrapper(*args, loglevel=None, arb_addr=None, **kwargs): def wrapper(
*args,
loglevel=None,
arb_addr=None,
start_method='forkserver',
**kwargs
):
# __tracebackhide__ = True # __tracebackhide__ = True
if 'arb_addr' in inspect.signature(fn).parameters: if 'arb_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well # injects test suite fixture value to test as well
@ -34,9 +40,15 @@ def tractor_test(fn):
# allows test suites to define a 'loglevel' fixture # allows test suites to define a 'loglevel' fixture
# that activates the internal logging # that activates the internal logging
kwargs['loglevel'] = loglevel kwargs['loglevel'] = loglevel
if 'start_method' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['start_method'] = start_method
return run( return run(
partial(fn, *args, **kwargs), partial(fn, *args, **kwargs),
arbiter_addr=arb_addr, loglevel=loglevel arbiter_addr=arb_addr,
loglevel=loglevel,
start_method=start_method,
) )
return wrapper return wrapper