diff --git a/tests/test_discovery.py b/tests/test_discovery.py index d182f4c..f180b60 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -199,7 +199,7 @@ def test_subactors_unregister_on_cancel( spawn_and_check_registry, arb_addr, use_signal, - False, + False, # remote arbiter with_streaming, arbiter_addr=arb_addr ) @@ -223,15 +223,94 @@ def test_subactors_unregister_on_cancel_remote_daemon( spawn_and_check_registry, arb_addr, use_signal, - True, + True, # remote arbiter with_streaming, # XXX: required to use remote daemon! arbiter_addr=arb_addr ) +async def streamer(agen): + async for item in agen: + print(item) + + +async def close_chans_before_nursery( + arb_addr: tuple, + use_signal: bool, + remote_arbiter: bool = False, +) -> None: + + # logic for how many actors should still be + # in the registry at teardown. + if remote_arbiter: + entries_at_end = 2 + else: + entries_at_end = 1 + + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run, 'self', 'get_registry') + + async with tractor.open_nursery() as tn: + portal1 = await tn.run_in_actor('consumer1', stream_forever) + agen1 = await portal1.result() + + portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) + agen2 = await portal2.run(__name__, 'stream_forever') + + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(.5) + + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == entries_at_end + + @pytest.mark.parametrize('use_signal', [False, True]) def test_close_channel_explicit( + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly and killing the actor's + "root nursery" **before** the containing nursery tears down also + results in subactor(s) deregistering from the arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + close_chans_before_nursery, + arb_addr, + use_signal, + False, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit_remote_arbiter( daemon, start_method, use_signal, @@ -241,53 +320,12 @@ def test_close_channel_explicit( "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. """ - async def streamer(agen): - async for item in agen: - print(item) - - async def main(): - async with tractor.get_arbiter(*arb_addr) as aportal: - try: - get_reg = partial(aportal.run, 'self', 'get_registry') - async with tractor.open_nursery() as tn: - portal1 = await tn.run_in_actor('consumer1', stream_forever) - agen1 = await portal1.result() - - portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) - agen2 = await portal2.run(__name__, 'stream_forever') - - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block - - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - tractor.current_actor()._root_nursery.cancel_scope.cancel() - - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() - finally: - with trio.CancelScope(shield=True): - await trio.sleep(.5) - - # all subactors should have de-registered - registry = await get_reg() - assert portal1.channel.uid not in registry - assert portal2.channel.uid not in registry - assert len(registry) == 2 - - with pytest.raises(KeyboardInterrupt): tractor.run( - main, + close_chans_before_nursery, + arb_addr, + use_signal, + True, # XXX: required to use remote daemon! arbiter_addr=arb_addr )