diff --git a/README.rst b/README.rst index 56fbc73..df6f223 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ tractor ======= -An async-native `actor model`_ built on trio_ and multiprocessing_. +An async-native "`actor model`_" built on trio_ and multiprocessing_. |travis| @@ -23,23 +23,26 @@ An async-native `actor model`_ built on trio_ and multiprocessing_. ``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to distributed multi-core Python. -``tractor`` lets you run and spawn *actors*: processes which each run a ``trio`` -scheduler and task tree (also known as an `async sandwich`_). -*Actors* communicate by sending messages_ over channels_ and avoid sharing any local state. -This `actor model`_ allows for highly distributed software architecture which works just as -well on multiple cores as it does over many hosts. -``tractor`` takes much inspiration from pulsar_ and execnet_ but attempts to be much more -focussed on sophistication of the lower level distributed architecture as well as have first -class support for `modern async Python`_. +``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run a ``trio`` scheduler and task +tree (also known as an `async sandwich`_). *Actors* communicate by exchanging asynchronous messages_ over +channels_ and avoid sharing any state. This model allows for highly distributed software architecture +which works just as well on multiple cores as it does over many hosts. -The first step to grok ``tractor`` is to get the basics of ``trio`` -down. A great place to start is the `trio docs`_ and this `blog post`_. +``tractor`` is an actor-model-*like* system in the sense that it adheres to the `3 axioms`_ but not does +not (yet) fufill all "unrequirements_" in practice. The API and design takes inspiration from pulsar_ and +execnet_ but attempts to be more focussed on sophistication of the lower level distributed architecture as +well as have first class support for streaming using `async generators`_. + +The first step to grok ``tractor`` is to get the basics of ``trio`` down. +A great place to start is the `trio docs`_ and this `blog post`_. .. _messages: https://en.wikipedia.org/wiki/Message_passing .. _trio docs: https://trio.readthedocs.io/en/latest/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ -.. _modern async Python: https://www.python.org/dev/peps/pep-0525/ +.. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts +.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony +.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. contents:: @@ -47,20 +50,25 @@ down. A great place to start is the `trio docs`_ and this `blog post`_. Philosophy ---------- -``tractor``'s tenets non-comprehensively include: +``tractor`` aims to be the Python multi-processing framework *you always wanted*. +Its tenets non-comprehensively include: + +- strict adherence to the `concept-in-progress`_ of *structured concurrency* - no spawning of processes *willy-nilly*; causality_ is paramount! -- `shared nothing architecture`_ -- remote errors `always propagate`_ back to the caller +- (remote) errors `always propagate`_ back to the parent / caller - verbatim support for ``trio``'s cancellation_ system +- `shared nothing architecture`_ - no use of *proxy* objects to wrap RPC calls - an immersive debugging experience - anti-fragility through `chaos engineering`_ + .. warning:: ``tractor`` is in alpha-alpha and is expected to change rapidly! Expect nothing to be set in stone. Your ideas about where it should go are greatly appreciated! +.. _concept-in-progress: https://trio.discourse.group/t/structured-concurrency-kickoff/55 .. _pulsar: http://quantmind.github.io/pulsar/design.html .. _execnet: https://codespeak.net/execnet/ @@ -74,6 +82,16 @@ No PyPi release yet! pip install git+git://github.com/tgoodlet/tractor.git +Windows "gotchas" +***************** +`tractor` internally uses the stdlib's `multiprocessing` package which +*can* have some gotchas on Windows. Namely, the need for calling +`freeze_support()`_ inside the ``__main__`` context. See `#61`_ for the +deats. + +.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support +.. _#61: https://github.com/tgoodlet/tractor/pull/61#issuecomment-470053512 + Examples -------- @@ -450,7 +468,6 @@ as ``multiprocessing`` calls it) which is running ``main()``. https://trio.readthedocs.io/en/latest/reference-core.html#getting-back-into-the-trio-thread-from-another-thread .. _asynchronous generators: https://www.python.org/dev/peps/pep-0525/ .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i -.. _asyncitertools: https://github.com/vodik/asyncitertools Cancellation @@ -630,6 +647,9 @@ multiple tasks streaming responses concurrently: The context notion comes from the context_ in nanomsg_. +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack + Running actors standalone ************************* @@ -645,6 +665,16 @@ need to hop into a debugger. You just need to pass the existing tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) +Choosing a ``multiprocessing`` *start method* +********************************************* +``tractor`` supports selection of the `multiprocessing start method`_ via +a ``start_method`` kwarg to ``tractor.run()``. Note that on Windows +*spawn* it the only supported method and on nix systems *forkserver* is +selected by default for speed. + +.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + + Enabling logging **************** Considering how complicated distributed software can become it helps to know @@ -673,6 +703,15 @@ Stuff I'd like to see ``tractor`` do real soon: but with better `pdb++`_ support - an extensive `chaos engineering`_ test suite - support for reactive programming primitives and native support for asyncitertools_ like libs +- introduction of a `capability-based security`_ model + +.. _supervisors: https://github.com/tgoodlet/tractor/issues/22 +.. _nanomsg: https://nanomsg.github.io/nng/index.html +.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol +.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html +.. _asyncitertools: https://github.com/vodik/asyncitertools +.. _pdb++: https://github.com/antocuni/pdb +.. _capability-based security: https://en.wikipedia.org/wiki/Capability-based_security Feel like saying hi? @@ -682,12 +721,4 @@ This project is very much coupled to the ongoing development of community). If you want to help, have suggestions or just want to say hi, please feel free to ping me on the `trio gitter channel`_! - -.. _supervisors: https://github.com/tgoodlet/tractor/issues/22 -.. _nanomsg: https://nanomsg.github.io/nng/index.html -.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 -.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol .. _trio gitter channel: https://gitter.im/python-trio/general -.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html -.. _pdb++: https://github.com/antocuni/pdb -.. _msgpack: https://en.wikipedia.org/wiki/MessagePack diff --git a/tests/conftest.py b/tests/conftest.py index 97dccd7..1e46da3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,3 +28,11 @@ def loglevel(request): @pytest.fixture(scope='session') def arb_addr(): return _arb_addr + + +def pytest_generate_tests(metafunc): + if 'start_method' in metafunc.fixturenames: + from multiprocessing import get_all_start_methods + methods = get_all_start_methods() + methods.remove('fork') + metafunc.parametrize("start_method", methods, scope='module') diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index a6e2813..e45fbac 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -110,7 +110,7 @@ async def stream_forever(): @tractor_test -async def test_cancel_infinite_streamer(): +async def test_cancel_infinite_streamer(start_method): # stream for at most 1 seconds with trio.move_on_after(1) as cancel_scope: @@ -139,7 +139,7 @@ async def test_cancel_infinite_streamer(): ids=['one_actor', 'two_actors'], ) @tractor_test -async def test_some_cancels_all(num_actors_and_errs): +async def test_some_cancels_all(num_actors_and_errs, start_method): """Verify a subset of failed subactors causes all others in the nursery to be cancelled just like the strategy in trio. diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 07dd737..7508a39 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -57,7 +57,7 @@ def movie_theatre_question(): @tractor_test -async def test_movie_theatre_convo(): +async def test_movie_theatre_convo(start_method): """The main ``tractor`` routine. """ async with tractor.open_nursery() as n: @@ -83,7 +83,7 @@ def cellar_door(): @tractor_test -async def test_most_beautiful_word(): +async def test_most_beautiful_word(start_method): """The main ``tractor`` routine. """ async with tractor.open_nursery() as n: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 2cddd41..172bb7f 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -62,10 +62,14 @@ async def stream_from_single_subactor(): # await nursery.cancel() -def test_stream_from_single_subactor(arb_addr): +def test_stream_from_single_subactor(arb_addr, start_method): """Verify streaming from a spawned async generator. """ - tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr) + tractor.run( + stream_from_single_subactor, + arbiter_addr=arb_addr, + start_method=start_method, + ) # this is the first 2 actors, streamer_1 and streamer_2 diff --git a/tractor/__init__.py b/tractor/__init__.py index 4214c2d..5be1d5d 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -11,7 +11,7 @@ import trio # type: ignore from trio import MultiError from .log import get_console_log, get_logger, get_loglevel -from ._ipc import _connect_chan, Channel, Context +from ._ipc import _connect_chan, Channel from ._actor import ( Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) @@ -19,6 +19,7 @@ from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed from . import msg +from . import _spawn __all__ = [ @@ -92,12 +93,16 @@ def run( name: str = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), + # the `multiprocessing` start method: + # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + start_method: str = 'forkserver', **kwargs: typing.Dict[str, typing.Any], ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ + _spawn.try_set_start_method(start_method) return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) diff --git a/tractor/_actor.py b/tractor/_actor.py index 7a5e72e..1b38d88 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -391,7 +391,8 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore + log.trace( # type: ignore + f"Received msg {msg} from {chan.uid}") if msg.get('cid'): # deliver response to local caller/waiter await self._push_result(chan, msg) @@ -478,18 +479,20 @@ class Actor: self, accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], + start_method: str, parent_addr: Tuple[str, int] = None ) -> None: """The routine called *after fork* which invokes a fresh ``trio.run`` """ self._forkserver_info = forkserver_info - from ._trionics import ctx + from ._spawn import try_set_start_method + spawn_ctx = try_set_start_method(start_method) if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) log.info( - f"Started new {ctx.current_process()} for {self.uid}") + f"Started new {spawn_ctx.current_process()} for {self.uid}") _state._current_actor = self log.debug(f"parent_addr is {parent_addr}") try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py new file mode 100644 index 0000000..859cc4e --- /dev/null +++ b/tractor/_spawn.py @@ -0,0 +1,98 @@ +""" +Process spawning. + +Mostly just wrapping around ``multiprocessing``. +""" +import multiprocessing as mp +from multiprocessing import forkserver, semaphore_tracker # type: ignore +from typing import Tuple, Optional + +from . import _forkserver_hackzorz +from ._state import current_actor +from ._actor import Actor + + +_ctx: mp.context.BaseContext = mp.get_context("spawn") + + +def try_set_start_method(name: str) -> mp.context.BaseContext: + """Attempt to set the start method for ``multiprocess.Process`` spawning. + + If the desired method is not supported the sub-interpreter (aka "spawn" + method) is used. + """ + global _ctx + + allowed = mp.get_all_start_methods() + assert name in allowed + + if name not in allowed: + name == 'spawn' + elif name == 'fork': + raise ValueError( + "`fork` is unsupported due to incompatibility with `trio`" + ) + elif name == 'forkserver': + _forkserver_hackzorz.override_stdlib() + + _ctx = mp.get_context(name) + return _ctx + + +def is_main_process() -> bool: + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' + + +def new_proc( + name: str, + actor: Actor, + # passed through to actor main + bind_addr: Tuple[str, int], + parent_addr: Tuple[str, int], +) -> mp.Process: + """Create a new ``multiprocessing.Process`` using the + spawn method as configured using ``try_set_start_method()``. + """ + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(semaphore_tracker._semaphore_tracker, '_pid', None), + semaphore_tracker._semaphore_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) + + return _ctx.Process( + target=actor._fork_main, + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), + # daemon=True, + name=name, + ) diff --git a/tractor/_state.py b/tractor/_state.py index 2606ff3..704fae7 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,7 +1,6 @@ """ Per process state """ -import multiprocessing as mp from typing import Optional @@ -14,9 +13,3 @@ def current_actor() -> 'Actor': # type: ignore if not _current_actor: raise RuntimeError("No actor instance has been defined yet?") return _current_actor - - -def is_main_process(): - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dd18158..bf83179 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -1,24 +1,30 @@ """ ``trio`` inspired apis and helpers """ -import multiprocessing as mp import inspect -from multiprocessing import forkserver, semaphore_tracker # type: ignore +import platform +import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing import trio from async_generator import asynccontextmanager, aclosing -from . import _forkserver_hackzorz from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor, ActorFailure from ._portal import Portal +from . import _spawn + + +if platform.system() == 'Windows': + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.WaitForSingleObject(proc.sentinel) +else: + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.wait_readable(proc.sentinel) -_forkserver_hackzorz.override_stdlib() -ctx = mp.get_context("forkserver") log = get_logger('tractor') @@ -36,7 +42,6 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False - self._forkserver: forkserver.ForkServer = None async def __aenter__(self): return self @@ -60,36 +65,11 @@ class ActorNursery: ) parent_addr = self._actor.accept_addr assert parent_addr - self._forkserver = fs = forkserver._forkserver - if mp.current_process().name == 'MainProcess' and ( - not self._actor._forkserver_info - ): - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, - ) - else: - assert self._actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, - ) = self._actor._forkserver_info - - proc = ctx.Process( - target=actor._fork_main, - args=(bind_addr, fs_info, parent_addr), - # daemon=True, - name=name, + proc = _spawn.new_proc( + name, + actor, + bind_addr, + parent_addr, ) # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait @@ -208,7 +188,8 @@ class ActorNursery: ) -> None: # TODO: timeout block here? if proc.is_alive(): - await trio.hazmat.wait_readable(proc.sentinel) + await proc_waiter(proc) + # please god don't hang proc.join() log.debug(f"Joined {proc}") diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 40a658a..7d8289e 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -24,7 +24,13 @@ def tractor_test(fn): injected to tests declaring these funcargs. """ @wraps(fn) - def wrapper(*args, loglevel=None, arb_addr=None, **kwargs): + def wrapper( + *args, + loglevel=None, + arb_addr=None, + start_method='forkserver', + **kwargs + ): # __tracebackhide__ = True if 'arb_addr' in inspect.signature(fn).parameters: # injects test suite fixture value to test as well @@ -34,9 +40,15 @@ def tractor_test(fn): # allows test suites to define a 'loglevel' fixture # that activates the internal logging kwargs['loglevel'] = loglevel + if 'start_method' in inspect.signature(fn).parameters: + # allows test suites to define a 'loglevel' fixture + # that activates the internal logging + kwargs['start_method'] = start_method return run( partial(fn, *args, **kwargs), - arbiter_addr=arb_addr, loglevel=loglevel + arbiter_addr=arb_addr, + loglevel=loglevel, + start_method=start_method, ) return wrapper