forked from goodboy/tractor
Add close channel test with remote arbiter
parent
c821690834
commit
acd5b80f4c
|
@ -199,7 +199,7 @@ def test_subactors_unregister_on_cancel(
|
||||||
spawn_and_check_registry,
|
spawn_and_check_registry,
|
||||||
arb_addr,
|
arb_addr,
|
||||||
use_signal,
|
use_signal,
|
||||||
False,
|
False, # remote arbiter
|
||||||
with_streaming,
|
with_streaming,
|
||||||
arbiter_addr=arb_addr
|
arbiter_addr=arb_addr
|
||||||
)
|
)
|
||||||
|
@ -223,15 +223,94 @@ def test_subactors_unregister_on_cancel_remote_daemon(
|
||||||
spawn_and_check_registry,
|
spawn_and_check_registry,
|
||||||
arb_addr,
|
arb_addr,
|
||||||
use_signal,
|
use_signal,
|
||||||
True,
|
True, # remote arbiter
|
||||||
with_streaming,
|
with_streaming,
|
||||||
# XXX: required to use remote daemon!
|
# XXX: required to use remote daemon!
|
||||||
arbiter_addr=arb_addr
|
arbiter_addr=arb_addr
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def streamer(agen):
|
||||||
|
async for item in agen:
|
||||||
|
print(item)
|
||||||
|
|
||||||
|
|
||||||
|
async def close_chans_before_nursery(
|
||||||
|
arb_addr: tuple,
|
||||||
|
use_signal: bool,
|
||||||
|
remote_arbiter: bool = False,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
# logic for how many actors should still be
|
||||||
|
# in the registry at teardown.
|
||||||
|
if remote_arbiter:
|
||||||
|
entries_at_end = 2
|
||||||
|
else:
|
||||||
|
entries_at_end = 1
|
||||||
|
|
||||||
|
async with tractor.get_arbiter(*arb_addr) as aportal:
|
||||||
|
try:
|
||||||
|
get_reg = partial(aportal.run, 'self', 'get_registry')
|
||||||
|
|
||||||
|
async with tractor.open_nursery() as tn:
|
||||||
|
portal1 = await tn.run_in_actor('consumer1', stream_forever)
|
||||||
|
agen1 = await portal1.result()
|
||||||
|
|
||||||
|
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
|
||||||
|
agen2 = await portal2.run(__name__, 'stream_forever')
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
n.start_soon(streamer, agen1)
|
||||||
|
n.start_soon(cancel, use_signal, .5)
|
||||||
|
try:
|
||||||
|
await streamer(agen2)
|
||||||
|
finally:
|
||||||
|
# Kill the root nursery thus resulting in
|
||||||
|
# normal arbiter channel ops to fail during
|
||||||
|
# teardown. It doesn't seem like this is
|
||||||
|
# reliably triggered by an external SIGINT.
|
||||||
|
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
# XXX: THIS IS THE KEY THING that happens
|
||||||
|
# **before** exiting the actor nursery block
|
||||||
|
|
||||||
|
# also kill off channels cuz why not
|
||||||
|
await agen1.aclose()
|
||||||
|
await agen2.aclose()
|
||||||
|
finally:
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(.5)
|
||||||
|
|
||||||
|
# all subactors should have de-registered
|
||||||
|
registry = await get_reg()
|
||||||
|
assert portal1.channel.uid not in registry
|
||||||
|
assert portal2.channel.uid not in registry
|
||||||
|
assert len(registry) == entries_at_end
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('use_signal', [False, True])
|
@pytest.mark.parametrize('use_signal', [False, True])
|
||||||
def test_close_channel_explicit(
|
def test_close_channel_explicit(
|
||||||
|
start_method,
|
||||||
|
use_signal,
|
||||||
|
arb_addr,
|
||||||
|
):
|
||||||
|
"""Verify that closing a stream explicitly and killing the actor's
|
||||||
|
"root nursery" **before** the containing nursery tears down also
|
||||||
|
results in subactor(s) deregistering from the arbiter.
|
||||||
|
"""
|
||||||
|
with pytest.raises(KeyboardInterrupt):
|
||||||
|
tractor.run(
|
||||||
|
close_chans_before_nursery,
|
||||||
|
arb_addr,
|
||||||
|
use_signal,
|
||||||
|
False,
|
||||||
|
# XXX: required to use remote daemon!
|
||||||
|
arbiter_addr=arb_addr
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('use_signal', [False, True])
|
||||||
|
def test_close_channel_explicit_remote_arbiter(
|
||||||
daemon,
|
daemon,
|
||||||
start_method,
|
start_method,
|
||||||
use_signal,
|
use_signal,
|
||||||
|
@ -241,53 +320,12 @@ def test_close_channel_explicit(
|
||||||
"root nursery" **before** the containing nursery tears down also
|
"root nursery" **before** the containing nursery tears down also
|
||||||
results in subactor(s) deregistering from the arbiter.
|
results in subactor(s) deregistering from the arbiter.
|
||||||
"""
|
"""
|
||||||
async def streamer(agen):
|
|
||||||
async for item in agen:
|
|
||||||
print(item)
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
async with tractor.get_arbiter(*arb_addr) as aportal:
|
|
||||||
try:
|
|
||||||
get_reg = partial(aportal.run, 'self', 'get_registry')
|
|
||||||
async with tractor.open_nursery() as tn:
|
|
||||||
portal1 = await tn.run_in_actor('consumer1', stream_forever)
|
|
||||||
agen1 = await portal1.result()
|
|
||||||
|
|
||||||
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
|
|
||||||
agen2 = await portal2.run(__name__, 'stream_forever')
|
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
|
||||||
n.start_soon(streamer, agen1)
|
|
||||||
n.start_soon(cancel, use_signal, .5)
|
|
||||||
try:
|
|
||||||
await streamer(agen2)
|
|
||||||
finally:
|
|
||||||
# XXX: THIS IS THE KEY THING that happens
|
|
||||||
# **before** exiting the actor nursery block
|
|
||||||
|
|
||||||
# Kill the root nursery thus resulting in
|
|
||||||
# normal arbiter channel ops to fail during
|
|
||||||
# teardown. It doesn't seem like this is
|
|
||||||
# reliably triggered by an external SIGINT.
|
|
||||||
tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
|
||||||
|
|
||||||
# also kill off channels cuz why not
|
|
||||||
await agen1.aclose()
|
|
||||||
await agen2.aclose()
|
|
||||||
finally:
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await trio.sleep(.5)
|
|
||||||
|
|
||||||
# all subactors should have de-registered
|
|
||||||
registry = await get_reg()
|
|
||||||
assert portal1.channel.uid not in registry
|
|
||||||
assert portal2.channel.uid not in registry
|
|
||||||
assert len(registry) == 2
|
|
||||||
|
|
||||||
|
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
tractor.run(
|
tractor.run(
|
||||||
main,
|
close_chans_before_nursery,
|
||||||
|
arb_addr,
|
||||||
|
use_signal,
|
||||||
|
True,
|
||||||
# XXX: required to use remote daemon!
|
# XXX: required to use remote daemon!
|
||||||
arbiter_addr=arb_addr
|
arbiter_addr=arb_addr
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue