diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py new file mode 100644 index 0000000..01b3af2 --- /dev/null +++ b/tests/test_cancellation.py @@ -0,0 +1,116 @@ +""" +Cancellation and error propagation +""" +from itertools import repeat + +import pytest +import trio +import tractor + +from conftest import tractor_test + + +async def assert_err(): + assert 0 + + +def test_remote_error(arb_addr): + """Verify an error raises in a subactor is propagated to the parent. + """ + async def main(): + async with tractor.open_nursery() as nursery: + + portal = await nursery.run_in_actor('errorer', assert_err) + + # get result(s) from main task + try: + return await portal.result() + except tractor.RemoteActorError: + print("Look Maa that actor failed hard, hehh") + raise + except Exception: + pass + assert 0, "Remote error was not raised?" + + with pytest.raises(tractor.RemoteActorError): + # also raises + tractor.run(main, arbiter_addr=arb_addr) + + +def do_nothing(): + pass + + +def test_cancel_single_subactor(arb_addr): + + async def main(): + + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor( + 'nothin', rpc_module_paths=[__name__], + ) + assert (await portal.run(__name__, 'do_nothing')) is None + + # would hang otherwise + await nursery.cancel() + + tractor.run(main, arbiter_addr=arb_addr) + + +async def stream_forever(): + for i in repeat("I can see these little future bubble things"): + # each yielded value is sent over the ``Channel`` to the + # parent actor + yield i + await trio.sleep(0.01) + + +@tractor_test +async def test_cancel_infinite_streamer(): + + # stream for at most 1 seconds + with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( + f'donny', + rpc_module_paths=[__name__], + ) + + # this async for loop streams values from the above + # async generator running in a separate process + async for letter in await portal.run(__name__, 'stream_forever'): + print(letter) + + # we support trio's cancellation system + assert cancel_scope.cancelled_caught + assert n.cancelled + + +@tractor_test +async def test_one_cancels_all(): + """Verify one failed actor causes all others in the nursery + to be cancelled just like in trio. + + This is the first and only supervisory strategy at the moment. + """ + try: + async with tractor.open_nursery() as n: + real_actors = [] + for i in range(3): + real_actors.append(await n.start_actor( + f'actor_{i}', + rpc_module_paths=[__name__], + )) + + # start one actor that will fail immediately + await n.run_in_actor('extra', assert_err) + + # should error here with a ``RemoteActorError`` containing + # an ``AssertionError` + + except tractor.RemoteActorError: + assert n.cancelled is True + assert not n._children + else: + pytest.fail("Should have gotten a remote assertion error?") diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 7ed3f7f..a52363a 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -2,7 +2,6 @@ Actor model API testing """ import time -from itertools import repeat import pytest import trio @@ -48,8 +47,6 @@ def test_local_actor_async_func(arb_addr): statespace = {'doggy': 10, 'kitty': 4} -# NOTE: this func must be defined at module level in order for the -# interal pickling infra of the forkserver to work async def spawn(is_arbiter): namespaces = [__name__] @@ -139,91 +136,6 @@ def test_stream_from_single_subactor(arb_addr): tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr) -async def assert_err(): - assert 0 - - -def test_remote_error(arb_addr): - """Verify an error raises in a subactor is propagated to the parent. - """ - async def main(): - async with tractor.open_nursery() as nursery: - - portal = await nursery.run_in_actor('errorer', assert_err) - - # get result(s) from main task - try: - return await portal.result() - except tractor.RemoteActorError: - print("Look Maa that actor failed hard, hehh") - raise - except Exception: - pass - assert 0, "Remote error was not raised?" - - with pytest.raises(tractor.RemoteActorError): - # also raises - tractor.run(main, arbiter_addr=arb_addr) - - -async def stream_forever(): - for i in repeat("I can see these little future bubble things"): - # each yielded value is sent over the ``Channel`` to the - # parent actor - yield i - await trio.sleep(0.01) - - -@tractor_test -async def test_cancel_infinite_streamer(): - - # stream for at most 1 seconds - with trio.move_on_after(1) as cancel_scope: - async with tractor.open_nursery() as n: - portal = await n.start_actor( - f'donny', - rpc_module_paths=[__name__], - ) - - # this async for loop streams values from the above - # async generator running in a separate process - async for letter in await portal.run(__name__, 'stream_forever'): - print(letter) - - # we support trio's cancellation system - assert cancel_scope.cancelled_caught - assert n.cancelled - - -@tractor_test -async def test_one_cancels_all(): - """Verify one failed actor causes all others in the nursery - to be cancelled just like in trio. - - This is the first and only supervisory strategy at the moment. - """ - try: - async with tractor.open_nursery() as n: - real_actors = [] - for i in range(3): - real_actors.append(await n.start_actor( - f'actor_{i}', - rpc_module_paths=[__name__], - )) - - # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) - - # should error here with a ``RemoteActorError`` containing - # an ``AssertionError` - - except tractor.RemoteActorError: - assert n.cancelled is True - assert not n._children - else: - pytest.fail("Should have gotten a remote assertion error?") - - def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. @@ -271,27 +183,6 @@ async def test_most_beautiful_word(): print(await portal.result()) -def do_nothing(): - pass - - -def test_cancel_single_subactor(arb_addr): - - async def main(): - - async with tractor.open_nursery() as nursery: - - portal = await nursery.start_actor( - 'nothin', rpc_module_paths=[__name__], - ) - assert (await portal.run(__name__, 'do_nothing')) is None - - # would hang otherwise - await nursery.cancel() - - tractor.run(main, arbiter_addr=arb_addr) - - # this is the first 2 actors, streamer_1 and streamer_2 async def stream_data(seed): for i in range(seed):