diff --git a/README.rst b/README.rst index 5819f20..7b6992c 100644 --- a/README.rst +++ b/README.rst @@ -10,7 +10,7 @@ An async-native "`actor model`_" built on trio_ and multiprocessing_. .. _actor model: https://en.wikipedia.org/wiki/Actor_model .. _trio: https://github.com/python-trio/trio -.. _multiprocessing: https://docs.python.org/3/library/multiprocessing.html +.. _multiprocessing: https://en.wikipedia.org/wiki/Multiprocessing .. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _always propagate: https://trio.readthedocs.io/en/latest/design.html#exceptions-always-propagate @@ -21,17 +21,31 @@ An async-native "`actor model`_" built on trio_ and multiprocessing_. .. _chaos engineering: http://principlesofchaos.org/ -``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to distributed multi-core Python. +``tractor`` is an attempt to bring trionic_ `structured concurrency`_ to +distributed multi-core Python. -``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run a ``trio`` scheduler and task -tree (also known as an `async sandwich`_). *Actors* communicate by exchanging asynchronous messages_ over -channels_ and avoid sharing any state. This model allows for highly distributed software architecture -which works just as well on multiple cores as it does over many hosts. +``tractor`` lets you spawn ``trio`` *"actors"*: processes which each run +a ``trio`` scheduled task tree (also known as an `async sandwich`_). +*Actors* communicate by exchanging asynchronous messages_ and avoid +sharing any state. This model allows for highly distributed software +architecture which works just as well on multiple cores as it does over +many hosts. -``tractor`` is an actor-model-*like* system in the sense that it adheres to the `3 axioms`_ but does -not (yet) fufill all "unrequirements_" in practice. The API and design takes inspiration from pulsar_ and -execnet_ but attempts to be more focussed on sophistication of the lower level distributed architecture as -well as have first class support for streaming using `async generators`_. +``tractor`` is an actor-model-*like* system in the sense that it adheres +to the `3 axioms`_ but does not (yet) fulfil all "unrequirements_" in +practise. It is an experiment in applying `structured concurrency`_ +constraints on a parallel processing system where multiple Python +processes exist over many hosts but no process can outlive its parent. +In `erlang` parlance, it is an architecture where every process has +a mandatory supervisor enforced by the type system. The API design is +almost exclusively inspired by trio_'s concepts and primitives (though +we often lag a little). As a distributed computing system `tractor` +attempts to place sophistication at the correct layer such that +concurrency primitives are powerful yet simple, making it easy to build +complex systems (you can build a "worker pool" architecture but it's +definitely not required). There is first class support for inter-actor +streaming using `async generators`_ and ongoing work toward a functional +reactive style for IPC. The first step to grok ``tractor`` is to get the basics of ``trio`` down. A great place to start is the `trio docs`_ and this `blog post`_. @@ -56,7 +70,7 @@ Its tenets non-comprehensively include: - strict adherence to the `concept-in-progress`_ of *structured concurrency* - no spawning of processes *willy-nilly*; causality_ is paramount! -- (remote) errors `always propagate`_ back to the parent / caller +- (remote) errors `always propagate`_ back to the parent supervisor - verbatim support for ``trio``'s cancellation_ system - `shared nothing architecture`_ - no use of *proxy* objects or shared references between processes diff --git a/setup.py b/setup.py index b6d60d1..190070e 100755 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ setup( maintainer='Tyler Goodlet', maintainer_email='jgbt@protonmail.com', url='https://github.com/goodboy/tractor', - platforms=['linux'], + platforms=['linux', 'windows'], packages=[ 'tractor', 'tractor.testing', @@ -53,7 +53,8 @@ setup( "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "Intended Audience :: Science/Research", "Intended Audience :: Developers", "Topic :: System :: Distributed Computing", diff --git a/tests/conftest.py b/tests/conftest.py index 2da68f7..fd8427f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,5 +35,7 @@ def pytest_generate_tests(metafunc): from multiprocessing import get_all_start_methods methods = get_all_start_methods() if 'fork' in methods: # fork not available on windows, so check before removing + # XXX: the fork method is in general incompatible with + # trio's global scheduler state methods.remove('fork') metafunc.parametrize("start_method", methods, scope='module') diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index e45fbac..040d8b6 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -1,6 +1,7 @@ """ Cancellation and error propagation """ +import platform from itertools import repeat import pytest @@ -10,10 +11,20 @@ import tractor from conftest import tractor_test -async def assert_err(): +async def assert_err(delay=0): + await trio.sleep(delay) assert 0 +async def sleep_forever(): + await trio.sleep(float('inf')) + + +async def do_nuthin(): + # just nick the scheduler + await trio.sleep(0) + + @pytest.mark.parametrize( 'args_err', [ @@ -77,6 +88,32 @@ def test_multierror(arb_addr): tractor.run(main, arbiter_addr=arb_addr) +@pytest.mark.parametrize('delay', (0, 0.5)) +@pytest.mark.parametrize( + 'num_subactors', range(25, 26), +) +def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): + """Verify we raise a ``trio.MultiError`` out of a nursery where + more then one actor errors and also with a delay before failure + to test failure during an ongoing spawning. + """ + async def main(): + async with tractor.open_nursery() as nursery: + for i in range(num_subactors): + await nursery.run_in_actor( + f'errorer{i}', assert_err, delay=delay) + + with pytest.raises(trio.MultiError) as exc_info: + tractor.run(main, arbiter_addr=arb_addr) + + assert exc_info.type == tractor.MultiError + err = exc_info.value + assert len(err.exceptions) == num_subactors + for exc in err.exceptions: + assert isinstance(exc, tractor.RemoteActorError) + assert exc.type == AssertionError + + def do_nothing(): pass @@ -133,10 +170,31 @@ async def test_cancel_infinite_streamer(start_method): @pytest.mark.parametrize( 'num_actors_and_errs', [ - (1, tractor.RemoteActorError, AssertionError), - (2, tractor.MultiError, AssertionError) + # daemon actors sit idle while single task actors error out + (1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None), + (2, tractor.MultiError, AssertionError, (assert_err, {}), None), + (3, tractor.MultiError, AssertionError, (assert_err, {}), None), + + # 1 daemon actor errors out while single task actors sleep forever + (3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}), + (assert_err, {}, True)), + # daemon actors error out after brief delay while single task + # actors complete quickly + (3, tractor.RemoteActorError, AssertionError, + (do_nuthin, {}), (assert_err, {'delay': 1}, True)), + # daemon complete quickly delay while single task + # actors error after brief delay + (3, tractor.MultiError, AssertionError, + (assert_err, {'delay': 1}), (do_nuthin, {}, False)), + ], + ids=[ + '1_run_in_actor_fails', + '2_run_in_actors_fail', + '3_run_in_actors_fail', + '1_daemon_actors_fail', + '1_daemon_actors_fail_all_run_in_actors_dun_quick', + 'no_daemon_actors_fail_all_run_in_actors_sleep_then_fail', ], - ids=['one_actor', 'two_actors'], ) @tractor_test async def test_some_cancels_all(num_actors_and_errs, start_method): @@ -145,25 +203,50 @@ async def test_some_cancels_all(num_actors_and_errs, start_method): This is the first and only supervisory strategy at the moment. """ - num, first_err, err_type = num_actors_and_errs + num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs try: async with tractor.open_nursery() as n: - real_actors = [] - for i in range(3): - real_actors.append(await n.start_actor( - f'actor_{i}', + + # spawn the same number of deamon actors which should be cancelled + dactor_portals = [] + for i in range(num_actors): + dactor_portals.append(await n.start_actor( + f'deamon_{i}', rpc_module_paths=[__name__], )) - for i in range(num): + func, kwargs = ria_func + riactor_portals = [] + for i in range(num_actors): # start actor(s) that will fail immediately - await n.run_in_actor(f'extra_{i}', assert_err) + riactor_portals.append( + await n.run_in_actor(f'actor_{i}', func, **kwargs)) + + if da_func: + func, kwargs, expect_error = da_func + for portal in dactor_portals: + # if this function fails then we should error here + # and the nursery should teardown all other actors + try: + await portal.run(__name__, func.__name__, **kwargs) + except tractor.RemoteActorError as err: + assert err.type == err_type + # we only expect this first error to propogate + # (all other daemons are cancelled before they + # can be scheduled) + num_actors = 1 + # reraise so nursery teardown is triggered + raise + else: + if expect_error: + pytest.fail( + "Deamon call should fail at checkpoint?") # should error here with a ``RemoteActorError`` or ``MultiError`` except first_err as err: if isinstance(err, tractor.MultiError): - assert len(err.exceptions) == num + assert len(err.exceptions) == num_actors for exc in err.exceptions: if isinstance(exc, tractor.RemoteActorError): assert exc.type == err_type @@ -176,3 +259,42 @@ async def test_some_cancels_all(num_actors_and_errs, start_method): assert not n._children else: pytest.fail("Should have gotten a remote assertion error?") + + +async def spawn_and_error(num) -> None: + name = tractor.current_actor().name + async with tractor.open_nursery() as nursery: + for i in range(num): + await nursery.run_in_actor( + f'{name}_errorer_{i}', assert_err + ) + + +@tractor_test +async def test_nested_multierrors(loglevel, start_method): + """Test that failed actor sets are wrapped in `trio.MultiError`s. + This test goes only 2 nurseries deep but we should eventually have tests + for arbitrary n-depth actor trees. + """ + if platform.system() == 'Windows': + # Windows CI seems to be partifcularly fragile on Python 3.8.. + num_subactors = 2 + else: + # XXX: any more then this and the forkserver will + # start bailing hard...gotta look into it + num_subactors = 4 + + try: + async with tractor.open_nursery() as nursery: + + for i in range(num_subactors): + await nursery.run_in_actor( + f'spawner_{i}', + spawn_and_error, + num=num_subactors, + ) + except trio.MultiError as err: + assert len(err.exceptions) == num_subactors + for subexc in err.exceptions: + assert isinstance(subexc, tractor.RemoteActorError) + assert subexc.type is trio.MultiError diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 2db8c68..f458494 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -60,7 +60,7 @@ async def say_hello_use_wait(other_actor): @tractor_test @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) -async def test_trynamic_trio(func): +async def test_trynamic_trio(func, start_method): """Main tractor entry point, the "master" process (for now acts as the "director"). """ diff --git a/tractor/_actor.py b/tractor/_actor.py index 3f2a89c..c75647e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -118,7 +118,7 @@ async def _invoke( with cancel_scope as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except Exception as err: + except (Exception, trio.MultiError) as err: # always ship errors back to caller log.exception("Actor errored:") err_msg = pack_error(err) @@ -352,7 +352,8 @@ class Actor: return cid, recv_chan async def _process_messages( - self, chan: Channel, + self, + chan: Channel, treat_as_gen: bool = False, shield: bool = False, task_status=trio.TASK_STATUS_IGNORED, @@ -461,7 +462,7 @@ class Actor: except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") - except Exception as err: + except (Exception, trio.MultiError) as err: # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent log.exception("Actor errored:") @@ -472,7 +473,7 @@ class Actor: # above to trigger an error at consuming portal "checkpoints" except trio.Cancelled: # debugging only - log.debug("Msg loop was cancelled") + log.debug(f"Msg loop was cancelled for {chan}") raise finally: log.debug( diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 0466a73..7ffe4cb 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -5,6 +5,8 @@ import importlib import builtins import traceback +import trio + _this_mod = importlib.import_module(__name__) @@ -14,7 +16,7 @@ class RemoteActorError(Exception): "Remote actor exception bundled locally" def __init__(self, message, type_str, **msgdata): super().__init__(message) - for ns in [builtins, _this_mod]: + for ns in [builtins, _this_mod, trio]: try: self.type = getattr(ns, type_str) break diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 7ebe37a..6954367 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -87,6 +87,7 @@ class ActorNursery: event, chan = await self._actor.wait_for_peer(actor.uid) portal = Portal(chan) self._children[actor.uid] = (actor, proc, portal) + return portal async def run_in_actor( @@ -174,12 +175,19 @@ class ActorNursery: result = await exhaust_portal(portal, actor) if isinstance(result, Exception): errors.append(result) - log.info(f"Cancelling {portal.channel.uid} gracefully") + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) + else: + log.info(f"Cancelling {portal.channel.uid} gracefully") + + # cancel the process now that we have a final result await portal.cancel_actor() - if cs.cancelled_caught: - log.warning( - "Result waiter was cancelled, process may have died") + # XXX: lol, this will never get run without a shield above.. + # if cs.cancelled_caught: + # log.warning( + # "Result waiter was cancelled, process may have died") async def wait_for_proc( proc: mp.Process, @@ -194,11 +202,12 @@ class ActorNursery: # please god don't hang proc.join() log.debug(f"Joined {proc}") + # indicate we are no longer managing this subactor self._children.pop(actor.uid) # proc terminated, cancel result waiter that may have - # been spawned in tandem - if cancel_scope: + # been spawned in tandem if not done already + if cancel_scope: # and not portal._cancelled: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -222,11 +231,12 @@ class ActorNursery: if errors: if not self.cancelled: - # halt here and expect to be called again once the nursery - # has been cancelled externally (ex. from within __aexit__() - # if an error is captured from ``wait()`` then ``cancel()`` - # is called immediately after which in turn calls ``wait()`` - # again.) + # bubble up error(s) here and expect to be called again + # once the nursery has been cancelled externally (ex. + # from within __aexit__() if an error is caught around + # ``self.wait()`` then, ``self.cancel()`` is called + # immediately, in the default supervisor strat, after + # which in turn ``self.wait()`` is called again.) raise trio.MultiError(errors) # wait on all `start_actor()` subactors to complete @@ -259,7 +269,7 @@ class ActorNursery: # os.kill(proc.pid, signal.SIGINT) log.debug(f"Cancelling nursery") - with trio.fail_after(3): + with trio.move_on_after(3) as cs: async with trio.open_nursery() as n: for subactor, proc, portal in self._children.values(): if hard_kill: @@ -272,6 +282,10 @@ class ActorNursery: await event.wait() # channel/portal should now be up _, _, portal = self._children[subactor.uid] + + # XXX should be impossible to get here + # unless method was called from within + # shielded cancel scope. if portal is None: # cancelled while waiting on the event # to arrive @@ -281,10 +295,18 @@ class ActorNursery: else: # there's no other choice left do_hard_kill(proc) - # spawn cancel tasks + # spawn cancel tasks for each sub-actor assert portal n.start_soon(portal.cancel_actor) + # if we cancelled the cancel (we hung cancelling remote actors) + # then hard kill all sub-processes + if cs.cancelled_caught: + log.error(f"Failed to gracefully cancel {self}, hard killing!") + async with trio.open_nursery() as n: + for subactor, proc, portal in self._children.values(): + n.start_soon(do_hard_kill, proc) + # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True await self.wait() @@ -292,6 +314,9 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ + # XXX: this is effectively the (for now) lone + # cancellation/supervisor strategy (one-cancels-all) + # which exactly mimicks trio's behaviour if etype is not None: try: # XXX: hypothetically an error could be raised and then @@ -313,16 +338,16 @@ class ActorNursery: raise trio.MultiError(merr.exceptions + [value]) raise else: - # XXX: this is effectively the (for now) lone - # cancellation/supervisor strategy which exactly - # mimicks trio's behaviour log.debug(f"Waiting on subactors {self._children} to complete") try: await self.wait() except (Exception, trio.MultiError) as err: - log.warning(f"Nursery caught {err}, cancelling") - await self.cancel() + log.warning(f"Nursery cancelling due to {err}") + if self._children: + with trio.CancelScope(shield=True): + await self.cancel() raise + log.debug(f"Nursery teardown complete")