From d75739e9c7a30420aa2787a1381ac7ede2af4479 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Mar 2019 18:52:19 -0500 Subject: [PATCH 1/9] Factor process creation into a separate factory Make a `_spawn` module for encapsulating all the `multiprocessing` "spawn method" stuff and factor current forkserver steps into it. --- tractor/_actor.py | 2 +- tractor/_spawn.py | 61 ++++++++++++++++++++++++++++++++++++++++++++ tractor/_state.py | 7 ----- tractor/_trionics.py | 42 +++++------------------------- 4 files changed, 69 insertions(+), 43 deletions(-) create mode 100644 tractor/_spawn.py diff --git a/tractor/_actor.py b/tractor/_actor.py index 7a5e72e3..2256ec2c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -483,7 +483,7 @@ class Actor: """The routine called *after fork* which invokes a fresh ``trio.run`` """ self._forkserver_info = forkserver_info - from ._trionics import ctx + from ._spawn import ctx if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") diff --git a/tractor/_spawn.py b/tractor/_spawn.py new file mode 100644 index 00000000..9dab478f --- /dev/null +++ b/tractor/_spawn.py @@ -0,0 +1,61 @@ +""" +Process spawning. + +Mostly just wrapping around ``multiprocessing``. +""" +import multiprocessing as mp +from multiprocessing import forkserver, semaphore_tracker # type: ignore +from typing import Tuple + +from . import _forkserver_hackzorz +from ._state import current_actor +from ._actor import Actor + + +_forkserver_hackzorz.override_stdlib() +ctx = mp.get_context("forkserver") + + +def is_main_process() -> bool: + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' + + +def new_proc( + name: str, + actor: Actor, + # passed through to actor main + bind_addr: Tuple[str, int], + parent_addr: Tuple[str, int], +) -> mp.Process: + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(semaphore_tracker._semaphore_tracker, '_pid', None), + semaphore_tracker._semaphore_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) = curr_actor._forkserver_info + + return ctx.Process( + target=actor._fork_main, + args=(bind_addr, fs_info, parent_addr), + # daemon=True, + name=name, + ) diff --git a/tractor/_state.py b/tractor/_state.py index 2606ff3e..704fae73 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,7 +1,6 @@ """ Per process state """ -import multiprocessing as mp from typing import Optional @@ -14,9 +13,3 @@ def current_actor() -> 'Actor': # type: ignore if not _current_actor: raise RuntimeError("No actor instance has been defined yet?") return _current_actor - - -def is_main_process(): - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dd181589..71f8d485 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,22 +3,20 @@ """ import multiprocessing as mp import inspect -from multiprocessing import forkserver, semaphore_tracker # type: ignore from typing import Tuple, List, Dict, Optional, Any import typing import trio from async_generator import asynccontextmanager, aclosing -from . import _forkserver_hackzorz +# from . import _forkserver_hackzorz from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor, ActorFailure from ._portal import Portal +from . import _spawn -_forkserver_hackzorz.override_stdlib() -ctx = mp.get_context("forkserver") log = get_logger('tractor') @@ -36,7 +34,6 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False - self._forkserver: forkserver.ForkServer = None async def __aenter__(self): return self @@ -60,36 +57,11 @@ class ActorNursery: ) parent_addr = self._actor.accept_addr assert parent_addr - self._forkserver = fs = forkserver._forkserver - if mp.current_process().name == 'MainProcess' and ( - not self._actor._forkserver_info - ): - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, - ) - else: - assert self._actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, - ) = self._actor._forkserver_info - - proc = ctx.Process( - target=actor._fork_main, - args=(bind_addr, fs_info, parent_addr), - # daemon=True, - name=name, + proc = _spawn.new_proc( + name, + actor, + bind_addr, + parent_addr, ) # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait From 7014a07986e967f613d6ff602c47902b3bde51c7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Mar 2019 00:29:07 -0500 Subject: [PATCH 2/9] Add "spawn" start method support Add full support for using the "spawn" process starting method as per: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods Add a `spawn_method` argument to `tractor.run()` for specifying the desired method explicitly. By default use the "fastest" method available. On *nix systems this is the original "forkserver" method. This should be the solution to getting windows support! Resolves #60 --- tractor/__init__.py | 3 ++ tractor/_actor.py | 9 +++-- tractor/_spawn.py | 88 +++++++++++++++++++++++++++++++-------------- 3 files changed, 70 insertions(+), 30 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 4214c2d9..96ac856c 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -19,6 +19,7 @@ from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed from . import msg +from . import _spawn __all__ = [ @@ -92,12 +93,14 @@ def run( name: str = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), + spawn_method: str = 'forkserver', **kwargs: typing.Dict[str, typing.Any], ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ + _spawn.try_set_start_method(spawn_method) return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2256ec2c..1b38d883 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -391,7 +391,8 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore + log.trace( # type: ignore + f"Received msg {msg} from {chan.uid}") if msg.get('cid'): # deliver response to local caller/waiter await self._push_result(chan, msg) @@ -478,18 +479,20 @@ class Actor: self, accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], + start_method: str, parent_addr: Tuple[str, int] = None ) -> None: """The routine called *after fork* which invokes a fresh ``trio.run`` """ self._forkserver_info = forkserver_info - from ._spawn import ctx + from ._spawn import try_set_start_method + spawn_ctx = try_set_start_method(start_method) if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) log.info( - f"Started new {ctx.current_process()} for {self.uid}") + f"Started new {spawn_ctx.current_process()} for {self.uid}") _state._current_actor = self log.debug(f"parent_addr is {parent_addr}") try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 9dab478f..3d355152 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,15 +5,35 @@ Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp from multiprocessing import forkserver, semaphore_tracker # type: ignore -from typing import Tuple +from typing import Tuple, Optional from . import _forkserver_hackzorz from ._state import current_actor from ._actor import Actor -_forkserver_hackzorz.override_stdlib() -ctx = mp.get_context("forkserver") +_ctx: mp.context.BaseContext = mp.get_context("spawn") + + +def try_set_start_method(name: str) -> mp.context.BaseContext: + """Attempt to set the start method for ``multiprocess.Process`` spawning. + + If the desired method is not supported the sub-interpreter (aka "spawn" + method) is used. + """ + global _ctx + + allowed = mp.get_all_start_methods() + if name not in allowed: + name == 'spawn' + + assert name in allowed + + if name == 'forkserver': + _forkserver_hackzorz.override_stdlib() + + _ctx = mp.get_context(name) + return _ctx def is_main_process() -> bool: @@ -29,33 +49,47 @@ def new_proc( bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], ) -> mp.Process: - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, - ) + """Create a new ``multiprocessing.Process`` using the + spawn method as configured using ``try_set_start_method()``. + """ + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(semaphore_tracker._semaphore_tracker, '_pid', None), + semaphore_tracker._semaphore_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) = curr_actor._forkserver_info else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, - ) = curr_actor._forkserver_info + fs_info = (None, None, None, None, None) - return ctx.Process( + return _ctx.Process( target=actor._fork_main, - args=(bind_addr, fs_info, parent_addr), + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), # daemon=True, name=name, ) From 483ae42a46504cff67a7326660e48803efcc5789 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Mar 2019 00:36:37 -0500 Subject: [PATCH 3/9] Add a `spawn_method` dynamic fixture --- tests/conftest.py | 8 ++++++++ tractor/testing/_tractor_test.py | 16 ++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 97dccd76..22c77f83 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,3 +28,11 @@ def loglevel(request): @pytest.fixture(scope='session') def arb_addr(): return _arb_addr + + +def pytest_generate_tests(metafunc): + if 'spawn_method' in metafunc.fixturenames: + from multiprocessing import get_all_start_methods + methods = get_all_start_methods() + methods.remove('fork') + metafunc.parametrize("spawn_method", methods, scope='module') diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 40a658ab..341d446d 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -24,7 +24,13 @@ def tractor_test(fn): injected to tests declaring these funcargs. """ @wraps(fn) - def wrapper(*args, loglevel=None, arb_addr=None, **kwargs): + def wrapper( + *args, + loglevel=None, + arb_addr=None, + spawn_method='forkserver', + **kwargs + ): # __tracebackhide__ = True if 'arb_addr' in inspect.signature(fn).parameters: # injects test suite fixture value to test as well @@ -34,9 +40,15 @@ def tractor_test(fn): # allows test suites to define a 'loglevel' fixture # that activates the internal logging kwargs['loglevel'] = loglevel + if 'spawn_method' in inspect.signature(fn).parameters: + # allows test suites to define a 'loglevel' fixture + # that activates the internal logging + kwargs['spawn_method'] = spawn_method return run( partial(fn, *args, **kwargs), - arbiter_addr=arb_addr, loglevel=loglevel + arbiter_addr=arb_addr, + loglevel=loglevel, + spawn_method=spawn_method, ) return wrapper From d6ca722bcc285427edb7d1475fba6f8c10b87f17 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Mar 2019 00:37:02 -0500 Subject: [PATCH 4/9] Sprinkle `spawn_method` fixture throughout tests --- tests/test_cancellation.py | 4 ++-- tests/test_spawning.py | 4 ++-- tests/test_streaming.py | 8 ++++++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index a6e2813e..b34c6022 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -110,7 +110,7 @@ async def stream_forever(): @tractor_test -async def test_cancel_infinite_streamer(): +async def test_cancel_infinite_streamer(spawn_method): # stream for at most 1 seconds with trio.move_on_after(1) as cancel_scope: @@ -139,7 +139,7 @@ async def test_cancel_infinite_streamer(): ids=['one_actor', 'two_actors'], ) @tractor_test -async def test_some_cancels_all(num_actors_and_errs): +async def test_some_cancels_all(num_actors_and_errs, spawn_method): """Verify a subset of failed subactors causes all others in the nursery to be cancelled just like the strategy in trio. diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 07dd737d..01071156 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -57,7 +57,7 @@ def movie_theatre_question(): @tractor_test -async def test_movie_theatre_convo(): +async def test_movie_theatre_convo(spawn_method): """The main ``tractor`` routine. """ async with tractor.open_nursery() as n: @@ -83,7 +83,7 @@ def cellar_door(): @tractor_test -async def test_most_beautiful_word(): +async def test_most_beautiful_word(spawn_method): """The main ``tractor`` routine. """ async with tractor.open_nursery() as n: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 2cddd414..d77d5626 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -62,10 +62,14 @@ async def stream_from_single_subactor(): # await nursery.cancel() -def test_stream_from_single_subactor(arb_addr): +def test_stream_from_single_subactor(arb_addr, spawn_method): """Verify streaming from a spawned async generator. """ - tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr) + tractor.run( + stream_from_single_subactor, + arbiter_addr=arb_addr, + spawn_method=spawn_method, + ) # this is the first 2 actors, streamer_1 and streamer_2 From dc5cc040e6a2d4671e14f42a2f4648b7f631cfb3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Mar 2019 12:44:16 -0500 Subject: [PATCH 5/9] Try to support waiting on Windows processes This pokes around a little in `trio` hazmat but it *should work* as it piggy backs on the new cross platform subprocess support. Relates to #59 --- tractor/_trionics.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 71f8d485..bf831799 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -1,15 +1,15 @@ """ ``trio`` inspired apis and helpers """ -import multiprocessing as mp import inspect +import platform +import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing import trio from async_generator import asynccontextmanager, aclosing -# from . import _forkserver_hackzorz from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor, ActorFailure @@ -17,6 +17,14 @@ from ._portal import Portal from . import _spawn +if platform.system() == 'Windows': + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.WaitForSingleObject(proc.sentinel) +else: + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.wait_readable(proc.sentinel) + + log = get_logger('tractor') @@ -180,7 +188,8 @@ class ActorNursery: ) -> None: # TODO: timeout block here? if proc.is_alive(): - await trio.hazmat.wait_readable(proc.sentinel) + await proc_waiter(proc) + # please god don't hang proc.join() log.debug(f"Joined {proc}") From 49b711fb5fb5b38a7a3c5b6b99a289adde813d23 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Mar 2019 21:30:00 -0500 Subject: [PATCH 6/9] Be more stingy about "actor model" --- README.rst | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/README.rst b/README.rst index 56fbc730..e4ae1f4c 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ tractor ======= -An async-native `actor model`_ built on trio_ and multiprocessing_. +An async-native "`actor model`_" built on trio_ and multiprocessing_. |travis| @@ -23,23 +23,26 @@ An async-native `actor model`_ built on trio_ and multiprocessing_. ``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to distributed multi-core Python. -``tractor`` lets you run and spawn *actors*: processes which each run a ``trio`` -scheduler and task tree (also known as an `async sandwich`_). -*Actors* communicate by sending messages_ over channels_ and avoid sharing any local state. -This `actor model`_ allows for highly distributed software architecture which works just as -well on multiple cores as it does over many hosts. -``tractor`` takes much inspiration from pulsar_ and execnet_ but attempts to be much more -focussed on sophistication of the lower level distributed architecture as well as have first -class support for `modern async Python`_. +``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run a ``trio`` scheduler and task +tree (also known as an `async sandwich`_). *Actors* communicate by exchanging asynchronous messages_ over +channels_ and avoid sharing any state. This model allows for highly distributed software architecture +which works just as well on multiple cores as it does over many hosts. ``tractor`` is an actor-model-*like* +system in the sense that it adheres to the `3 axioms`_ but not does not (yet) fufill all "unrequirements_" in +practice. -The first step to grok ``tractor`` is to get the basics of ``trio`` -down. A great place to start is the `trio docs`_ and this `blog post`_. +``tractor`` takes inspiration from pulsar_ and execnet_ but attempts to be more focussed on sophistication of +the lower level distributed architecture as well as have first class support for streaming using `async generators`_. + +The first step to grok ``tractor`` is to get the basics of ``trio`` down. +A great place to start is the `trio docs`_ and this `blog post`_. .. _messages: https://en.wikipedia.org/wiki/Message_passing .. _trio docs: https://trio.readthedocs.io/en/latest/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ -.. _modern async Python: https://www.python.org/dev/peps/pep-0525/ +.. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts +.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony +.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. contents:: @@ -47,20 +50,25 @@ down. A great place to start is the `trio docs`_ and this `blog post`_. Philosophy ---------- -``tractor``'s tenets non-comprehensively include: +``tractor`` aims to be the Python multi-processing framework *you always wanted*. +Its tenets non-comprehensively include: + +- strict adherence to the `concept-in-progress`_ of *structured concurrency* - no spawning of processes *willy-nilly*; causality_ is paramount! -- `shared nothing architecture`_ -- remote errors `always propagate`_ back to the caller +- (remote) errors `always propagate`_ back to the parent / caller - verbatim support for ``trio``'s cancellation_ system +- `shared nothing architecture`_ - no use of *proxy* objects to wrap RPC calls - an immersive debugging experience - anti-fragility through `chaos engineering`_ + .. warning:: ``tractor`` is in alpha-alpha and is expected to change rapidly! Expect nothing to be set in stone. Your ideas about where it should go are greatly appreciated! +.. _concept-in-progress: https://trio.discourse.group/t/structured-concurrency-kickoff/55 .. _pulsar: http://quantmind.github.io/pulsar/design.html .. _execnet: https://codespeak.net/execnet/ @@ -450,7 +458,6 @@ as ``multiprocessing`` calls it) which is running ``main()``. https://trio.readthedocs.io/en/latest/reference-core.html#getting-back-into-the-trio-thread-from-another-thread .. _asynchronous generators: https://www.python.org/dev/peps/pep-0525/ .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i -.. _asyncitertools: https://github.com/vodik/asyncitertools Cancellation @@ -673,6 +680,7 @@ Stuff I'd like to see ``tractor`` do real soon: but with better `pdb++`_ support - an extensive `chaos engineering`_ test suite - support for reactive programming primitives and native support for asyncitertools_ like libs +- introduction of a `capability-based security`_ model Feel like saying hi? @@ -691,3 +699,5 @@ say hi, please feel free to ping me on the `trio gitter channel`_! .. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html .. _pdb++: https://github.com/antocuni/pdb .. _msgpack: https://en.wikipedia.org/wiki/MessagePack +.. _asyncitertools: https://github.com/vodik/asyncitertools +.. _capability-based security: https://en.wikipedia.org/wiki/Capability-based_security From 8eb138b8a7c1c1c31fde64d7c84abf064586c869 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Mar 2019 18:28:22 -0500 Subject: [PATCH 7/9] Add Windows *gotchas* section Resolves #61 --- README.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.rst b/README.rst index e4ae1f4c..7c1c8c6f 100644 --- a/README.rst +++ b/README.rst @@ -82,6 +82,16 @@ No PyPi release yet! pip install git+git://github.com/tgoodlet/tractor.git +Windows "gotchas" +***************** +`tractor` uses the stdlib's `multiprocessing` module internally which +*can* have some *gotchas* on Windows, namely the need for calling +`freeze_support()`_ inside the ``__main__`` context. See `#61`_ for the +deats. + +.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support +.. _#61: https://github.com/tgoodlet/tractor/pull/61#issuecomment-470053512 + Examples -------- From c3daf731127129b7b1f4558e92b8afd8fd4004cd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Mar 2019 19:54:27 -0500 Subject: [PATCH 8/9] Document the mp start method more explicitly --- README.rst | 45 ++++++++++++++++++++++++++++----------------- tractor/__init__.py | 8 +++++--- tractor/_spawn.py | 11 +++++++---- 3 files changed, 40 insertions(+), 24 deletions(-) diff --git a/README.rst b/README.rst index 7c1c8c6f..df6f223c 100644 --- a/README.rst +++ b/README.rst @@ -26,12 +26,12 @@ An async-native "`actor model`_" built on trio_ and multiprocessing_. ``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run a ``trio`` scheduler and task tree (also known as an `async sandwich`_). *Actors* communicate by exchanging asynchronous messages_ over channels_ and avoid sharing any state. This model allows for highly distributed software architecture -which works just as well on multiple cores as it does over many hosts. ``tractor`` is an actor-model-*like* -system in the sense that it adheres to the `3 axioms`_ but not does not (yet) fufill all "unrequirements_" in -practice. +which works just as well on multiple cores as it does over many hosts. -``tractor`` takes inspiration from pulsar_ and execnet_ but attempts to be more focussed on sophistication of -the lower level distributed architecture as well as have first class support for streaming using `async generators`_. +``tractor`` is an actor-model-*like* system in the sense that it adheres to the `3 axioms`_ but not does +not (yet) fufill all "unrequirements_" in practice. The API and design takes inspiration from pulsar_ and +execnet_ but attempts to be more focussed on sophistication of the lower level distributed architecture as +well as have first class support for streaming using `async generators`_. The first step to grok ``tractor`` is to get the basics of ``trio`` down. A great place to start is the `trio docs`_ and this `blog post`_. @@ -84,8 +84,8 @@ No PyPi release yet! Windows "gotchas" ***************** -`tractor` uses the stdlib's `multiprocessing` module internally which -*can* have some *gotchas* on Windows, namely the need for calling +`tractor` internally uses the stdlib's `multiprocessing` package which +*can* have some gotchas on Windows. Namely, the need for calling `freeze_support()`_ inside the ``__main__`` context. See `#61`_ for the deats. @@ -647,6 +647,9 @@ multiple tasks streaming responses concurrently: The context notion comes from the context_ in nanomsg_. +.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 +.. _msgpack: https://en.wikipedia.org/wiki/MessagePack + Running actors standalone ************************* @@ -662,6 +665,16 @@ need to hop into a debugger. You just need to pass the existing tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) +Choosing a ``multiprocessing`` *start method* +********************************************* +``tractor`` supports selection of the `multiprocessing start method`_ via +a ``start_method`` kwarg to ``tractor.run()``. Note that on Windows +*spawn* it the only supported method and on nix systems *forkserver* is +selected by default for speed. + +.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + + Enabling logging **************** Considering how complicated distributed software can become it helps to know @@ -692,6 +705,14 @@ Stuff I'd like to see ``tractor`` do real soon: - support for reactive programming primitives and native support for asyncitertools_ like libs - introduction of a `capability-based security`_ model +.. _supervisors: https://github.com/tgoodlet/tractor/issues/22 +.. _nanomsg: https://nanomsg.github.io/nng/index.html +.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol +.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html +.. _asyncitertools: https://github.com/vodik/asyncitertools +.. _pdb++: https://github.com/antocuni/pdb +.. _capability-based security: https://en.wikipedia.org/wiki/Capability-based_security + Feel like saying hi? -------------------- @@ -700,14 +721,4 @@ This project is very much coupled to the ongoing development of community). If you want to help, have suggestions or just want to say hi, please feel free to ping me on the `trio gitter channel`_! - -.. _supervisors: https://github.com/tgoodlet/tractor/issues/22 -.. _nanomsg: https://nanomsg.github.io/nng/index.html -.. _context: https://nanomsg.github.io/nng/man/tip/nng_ctx.5 -.. _gossip protocol: https://en.wikipedia.org/wiki/Gossip_protocol .. _trio gitter channel: https://gitter.im/python-trio/general -.. _celery: http://docs.celeryproject.org/en/latest/userguide/debugging.html -.. _pdb++: https://github.com/antocuni/pdb -.. _msgpack: https://en.wikipedia.org/wiki/MessagePack -.. _asyncitertools: https://github.com/vodik/asyncitertools -.. _capability-based security: https://en.wikipedia.org/wiki/Capability-based_security diff --git a/tractor/__init__.py b/tractor/__init__.py index 96ac856c..5be1d5d5 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -11,7 +11,7 @@ import trio # type: ignore from trio import MultiError from .log import get_console_log, get_logger, get_loglevel -from ._ipc import _connect_chan, Channel, Context +from ._ipc import _connect_chan, Channel from ._actor import ( Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) @@ -93,14 +93,16 @@ def run( name: str = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), - spawn_method: str = 'forkserver', + # the `multiprocessing` start method: + # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + start_method: str = 'forkserver', **kwargs: typing.Dict[str, typing.Any], ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ - _spawn.try_set_start_method(spawn_method) + _spawn.try_set_start_method(start_method) return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3d355152..859cc4e9 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -24,12 +24,15 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: global _ctx allowed = mp.get_all_start_methods() - if name not in allowed: - name == 'spawn' - assert name in allowed - if name == 'forkserver': + if name not in allowed: + name == 'spawn' + elif name == 'fork': + raise ValueError( + "`fork` is unsupported due to incompatibility with `trio`" + ) + elif name == 'forkserver': _forkserver_hackzorz.override_stdlib() _ctx = mp.get_context(name) From b70f4eafcb5e75a2bbb126edbc9b6d887a38766e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Mar 2019 20:06:16 -0500 Subject: [PATCH 9/9] Flip tests to use `start_method` kwarg --- tests/conftest.py | 4 ++-- tests/test_cancellation.py | 4 ++-- tests/test_spawning.py | 4 ++-- tests/test_streaming.py | 4 ++-- tractor/testing/_tractor_test.py | 8 ++++---- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 22c77f83..1e46da3b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,8 +31,8 @@ def arb_addr(): def pytest_generate_tests(metafunc): - if 'spawn_method' in metafunc.fixturenames: + if 'start_method' in metafunc.fixturenames: from multiprocessing import get_all_start_methods methods = get_all_start_methods() methods.remove('fork') - metafunc.parametrize("spawn_method", methods, scope='module') + metafunc.parametrize("start_method", methods, scope='module') diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index b34c6022..e45fbac9 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -110,7 +110,7 @@ async def stream_forever(): @tractor_test -async def test_cancel_infinite_streamer(spawn_method): +async def test_cancel_infinite_streamer(start_method): # stream for at most 1 seconds with trio.move_on_after(1) as cancel_scope: @@ -139,7 +139,7 @@ async def test_cancel_infinite_streamer(spawn_method): ids=['one_actor', 'two_actors'], ) @tractor_test -async def test_some_cancels_all(num_actors_and_errs, spawn_method): +async def test_some_cancels_all(num_actors_and_errs, start_method): """Verify a subset of failed subactors causes all others in the nursery to be cancelled just like the strategy in trio. diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 01071156..7508a39e 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -57,7 +57,7 @@ def movie_theatre_question(): @tractor_test -async def test_movie_theatre_convo(spawn_method): +async def test_movie_theatre_convo(start_method): """The main ``tractor`` routine. """ async with tractor.open_nursery() as n: @@ -83,7 +83,7 @@ def cellar_door(): @tractor_test -async def test_most_beautiful_word(spawn_method): +async def test_most_beautiful_word(start_method): """The main ``tractor`` routine. """ async with tractor.open_nursery() as n: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index d77d5626..172bb7fa 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -62,13 +62,13 @@ async def stream_from_single_subactor(): # await nursery.cancel() -def test_stream_from_single_subactor(arb_addr, spawn_method): +def test_stream_from_single_subactor(arb_addr, start_method): """Verify streaming from a spawned async generator. """ tractor.run( stream_from_single_subactor, arbiter_addr=arb_addr, - spawn_method=spawn_method, + start_method=start_method, ) diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 341d446d..7d8289ed 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -28,7 +28,7 @@ def tractor_test(fn): *args, loglevel=None, arb_addr=None, - spawn_method='forkserver', + start_method='forkserver', **kwargs ): # __tracebackhide__ = True @@ -40,15 +40,15 @@ def tractor_test(fn): # allows test suites to define a 'loglevel' fixture # that activates the internal logging kwargs['loglevel'] = loglevel - if 'spawn_method' in inspect.signature(fn).parameters: + if 'start_method' in inspect.signature(fn).parameters: # allows test suites to define a 'loglevel' fixture # that activates the internal logging - kwargs['spawn_method'] = spawn_method + kwargs['start_method'] = start_method return run( partial(fn, *args, **kwargs), arbiter_addr=arb_addr, loglevel=loglevel, - spawn_method=spawn_method, + start_method=start_method, ) return wrapper