From d2d8860dad779d639f7ea35b3555f92c548e9871 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Aug 2020 23:51:23 -0400 Subject: [PATCH] Add test for dereg failure on manual stream close There was code from the last de-registration fix PR that I had commented (to do with shielding arbiter dereg steps in `Actor._async_main()`) because the block didn't seem to make a difference under infinite streaming tests. Turns out it **for sure** is needed under certain conditions (likely if the actor's root nursery is cancelled prior to actor nursery exit). This was an attempt to simulate the failure mode if you manually close the stream **before** cancelling the containing **actor**. More tests to come I guess. --- tests/test_discovery.py | 56 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 6370fe2..96a3908 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -228,3 +228,59 @@ def test_subactors_unregister_on_cancel_remote_daemon( # XXX: required to use remote daemon! arbiter_addr=arb_addr ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit( + daemon, + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly **before** the containing + nursery tears down also results in subactor(s) deregistering from the + arbiter. + """ + async def streamer(agen): + async for item in agen: + print(item) + + async def main(): + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run, 'self', 'get_registry') + async with tractor.open_nursery() as tn: + portal1 = await tn.run_in_actor('consumer1', stream_forever) + agen1 = await portal1.result() + + portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) + agen2 = await portal2.run(__name__, 'stream_forever') + + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + # (i think?). + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(.5) + + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == 2 + + + with pytest.raises(KeyboardInterrupt): + tractor.run( + main, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + )