forked from goodboy/tractor
Add test for dereg failure on manual stream close
There was code from the last de-registration fix PR that I had commented (to do with shielding arbiter dereg steps in `Actor._async_main()`) because the block didn't seem to make a difference under infinite streaming tests. Turns out it **for sure** is needed under certain conditions (likely if the actor's root nursery is cancelled prior to actor nursery exit). This was an attempt to simulate the failure mode if you manually close the stream **before** cancelling the containing **actor**. More tests to come I guess.dereg_on_channel_aclose
parent
8da45eedf4
commit
d2d8860dad
|
@ -228,3 +228,59 @@ def test_subactors_unregister_on_cancel_remote_daemon(
|
||||||
# XXX: required to use remote daemon!
|
# XXX: required to use remote daemon!
|
||||||
arbiter_addr=arb_addr
|
arbiter_addr=arb_addr
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('use_signal', [False, True])
|
||||||
|
def test_close_channel_explicit(
|
||||||
|
daemon,
|
||||||
|
start_method,
|
||||||
|
use_signal,
|
||||||
|
arb_addr,
|
||||||
|
):
|
||||||
|
"""Verify that closing a stream explicitly **before** the containing
|
||||||
|
nursery tears down also results in subactor(s) deregistering from the
|
||||||
|
arbiter.
|
||||||
|
"""
|
||||||
|
async def streamer(agen):
|
||||||
|
async for item in agen:
|
||||||
|
print(item)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.get_arbiter(*arb_addr) as aportal:
|
||||||
|
try:
|
||||||
|
get_reg = partial(aportal.run, 'self', 'get_registry')
|
||||||
|
async with tractor.open_nursery() as tn:
|
||||||
|
portal1 = await tn.run_in_actor('consumer1', stream_forever)
|
||||||
|
agen1 = await portal1.result()
|
||||||
|
|
||||||
|
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
|
||||||
|
agen2 = await portal2.run(__name__, 'stream_forever')
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
n.start_soon(streamer, agen1)
|
||||||
|
n.start_soon(cancel, use_signal, .5)
|
||||||
|
try:
|
||||||
|
await streamer(agen2)
|
||||||
|
finally:
|
||||||
|
# XXX: THIS IS THE KEY THING that happens
|
||||||
|
# **before** exiting the actor nursery block
|
||||||
|
# (i think?).
|
||||||
|
await agen1.aclose()
|
||||||
|
await agen2.aclose()
|
||||||
|
finally:
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(.5)
|
||||||
|
|
||||||
|
# all subactors should have de-registered
|
||||||
|
registry = await get_reg()
|
||||||
|
assert portal1.channel.uid not in registry
|
||||||
|
assert portal2.channel.uid not in registry
|
||||||
|
assert len(registry) == 2
|
||||||
|
|
||||||
|
|
||||||
|
with pytest.raises(KeyboardInterrupt):
|
||||||
|
tractor.run(
|
||||||
|
main,
|
||||||
|
# XXX: required to use remote daemon!
|
||||||
|
arbiter_addr=arb_addr
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue