From a5279f80a781b669ea03839f1b686d5c2ca92087 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:24:28 -0400 Subject: [PATCH] Actually reproduce the de-registration problem This truly reproduces #141. It turns out the problem only occurs when we're cancelled in the middle of consuming "infinite streams". Good news is this tests a lot of edge cases :) --- tests/test_discovery.py | 139 +++++++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 43 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 7d6f69b..6370fe2 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -5,6 +5,7 @@ import os import signal import platform from functools import partial +import itertools import pytest import tractor @@ -87,10 +88,35 @@ async def test_trynamic_trio(func, start_method): print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") +async def stream_forever(): + for i in itertools.count(): + yield i + await trio.sleep(0.01) + + +async def cancel(use_signal, delay=0): + # hold on there sally + await trio.sleep(delay) + + # trigger cancel + if use_signal: + if platform.system() == 'Windows': + pytest.skip("SIGINT not supported on windows") + os.kill(os.getpid(), signal.SIGINT) + else: + raise KeyboardInterrupt + + +async def stream_from(portal): + async for value in await portal.result(): + print(value) + + async def spawn_and_check_registry( arb_addr: tuple, use_signal: bool, remote_arbiter: bool = False, + with_streaming: bool = False, ) -> None: actor = tractor.current_actor() @@ -101,9 +127,7 @@ async def spawn_and_check_registry( if actor.is_arbiter: async def get_reg(): return actor._registry - extra = 1 # arbiter is local root actor - else: get_reg = partial(portal.run, 'self', 'get_registry') extra = 2 # local root actor + remote arbiter @@ -112,66 +136,95 @@ async def spawn_and_check_registry( registry = await get_reg() assert actor.uid in registry - try: - async with tractor.open_nursery() as n: - portals = {} - for i in range(3): - name = f'a{i}' - portals[name] = await n.run_in_actor( - name, trio.sleep_forever) + if with_streaming: + to_run = stream_forever + else: + to_run = trio.sleep_forever - # wait on last actor to come up - async with tractor.wait_for_actor(name): + async with trio.open_nursery() as trion: + try: + async with tractor.open_nursery() as n: + portals = {} + for i in range(3): + name = f'a{i}' + portals[name] = await n.run_in_actor(name, to_run) + + # wait on last actor to come up + async with tractor.wait_for_actor(name): + registry = await get_reg() + for uid in n._children: + assert uid in registry + + assert len(portals) + extra == len(registry) + + if with_streaming: + await trio.sleep(0.1) + + pts = list(portals.values()) + for p in pts[:-1]: + trion.start_soon(stream_from, p) + + # stream for 1 sec + trion.start_soon(cancel, use_signal, 1) + + last_p = pts[-1] + async for value in await last_p.result(): + print(value) + else: + await cancel(use_signal) + + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + + # all subactors should have de-registered registry = await get_reg() - for uid in n._children: - assert uid in registry - - assert len(portals) + extra == len(registry) - - # trigger cancel - if use_signal: - if platform.system() == 'Windows': - pytest.skip("SIGINT not supported on windows") - os.kill(os.getpid(), signal.SIGINT) - else: - raise KeyboardInterrupt - finally: - # all subactors should have de-registered - await trio.sleep(0.5) - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) +@pytest.mark.parametrize('with_streaming', [False, True]) def test_subactors_unregister_on_cancel( - start_method, - use_signal, - arb_addr -): - """Verify that cancelling a nursery results in all subactors - deregistering themselves with the arbiter. - """ - with pytest.raises(KeyboardInterrupt): - tractor.run(spawn_and_check_registry, arb_addr, use_signal) - - -@pytest.mark.parametrize('use_signal', [False, True]) -def test_subactors_unregister_on_cancel_remote_daemon( - daemon, start_method, use_signal, arb_addr, + with_streaming, ): """Verify that cancelling a nursery results in all subactors deregistering themselves with the arbiter. """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + spawn_and_check_registry, + arb_addr, + use_signal, + False, + with_streaming, + arbiter_addr=arb_addr + ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +@pytest.mark.parametrize('with_streaming', [False, True]) +def test_subactors_unregister_on_cancel_remote_daemon( + daemon, + start_method, + use_signal, + arb_addr, + with_streaming, +): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with a **remote** (not in the local process + tree) arbiter. + """ with pytest.raises(KeyboardInterrupt): tractor.run( spawn_and_check_registry, arb_addr, use_signal, True, + with_streaming, # XXX: required to use remote daemon! arbiter_addr=arb_addr )