diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 6370fe2..96a3908 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -228,3 +228,59 @@ def test_subactors_unregister_on_cancel_remote_daemon( # XXX: required to use remote daemon! arbiter_addr=arb_addr ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_close_channel_explicit( + daemon, + start_method, + use_signal, + arb_addr, +): + """Verify that closing a stream explicitly **before** the containing + nursery tears down also results in subactor(s) deregistering from the + arbiter. + """ + async def streamer(agen): + async for item in agen: + print(item) + + async def main(): + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run, 'self', 'get_registry') + async with tractor.open_nursery() as tn: + portal1 = await tn.run_in_actor('consumer1', stream_forever) + agen1 = await portal1.result() + + portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) + agen2 = await portal2.run(__name__, 'stream_forever') + + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + # (i think?). + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(.5) + + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == 2 + + + with pytest.raises(KeyboardInterrupt): + tractor.run( + main, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + )