forked from goodboy/tractor
1
0
Fork 0

Actually reproduce the de-registration problem

This truly reproduces #141. It turns out the problem only occurs when
we're cancelled in the middle of consuming "infinite streams".
Good news is this tests a lot of edge cases :)
ensure_deregister
Tyler Goodlet 2020-08-03 18:24:28 -04:00
parent 699bfd1857
commit a5279f80a7
1 changed files with 96 additions and 43 deletions

View File

@ -5,6 +5,7 @@ import os
import signal import signal
import platform import platform
from functools import partial from functools import partial
import itertools
import pytest import pytest
import tractor import tractor
@ -87,10 +88,35 @@ async def test_trynamic_trio(func, start_method):
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")
async def stream_forever():
for i in itertools.count():
yield i
await trio.sleep(0.01)
async def cancel(use_signal, delay=0):
# hold on there sally
await trio.sleep(delay)
# trigger cancel
if use_signal:
if platform.system() == 'Windows':
pytest.skip("SIGINT not supported on windows")
os.kill(os.getpid(), signal.SIGINT)
else:
raise KeyboardInterrupt
async def stream_from(portal):
async for value in await portal.result():
print(value)
async def spawn_and_check_registry( async def spawn_and_check_registry(
arb_addr: tuple, arb_addr: tuple,
use_signal: bool, use_signal: bool,
remote_arbiter: bool = False, remote_arbiter: bool = False,
with_streaming: bool = False,
) -> None: ) -> None:
actor = tractor.current_actor() actor = tractor.current_actor()
@ -101,9 +127,7 @@ async def spawn_and_check_registry(
if actor.is_arbiter: if actor.is_arbiter:
async def get_reg(): async def get_reg():
return actor._registry return actor._registry
extra = 1 # arbiter is local root actor extra = 1 # arbiter is local root actor
else: else:
get_reg = partial(portal.run, 'self', 'get_registry') get_reg = partial(portal.run, 'self', 'get_registry')
extra = 2 # local root actor + remote arbiter extra = 2 # local root actor + remote arbiter
@ -112,13 +136,18 @@ async def spawn_and_check_registry(
registry = await get_reg() registry = await get_reg()
assert actor.uid in registry assert actor.uid in registry
if with_streaming:
to_run = stream_forever
else:
to_run = trio.sleep_forever
async with trio.open_nursery() as trion:
try: try:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
portals[name] = await n.run_in_actor( portals[name] = await n.run_in_actor(name, to_run)
name, trio.sleep_forever)
# wait on last actor to come up # wait on last actor to come up
async with tractor.wait_for_actor(name): async with tractor.wait_for_actor(name):
@ -128,50 +157,74 @@ async def spawn_and_check_registry(
assert len(portals) + extra == len(registry) assert len(portals) + extra == len(registry)
# trigger cancel if with_streaming:
if use_signal: await trio.sleep(0.1)
if platform.system() == 'Windows':
pytest.skip("SIGINT not supported on windows") pts = list(portals.values())
os.kill(os.getpid(), signal.SIGINT) for p in pts[:-1]:
trion.start_soon(stream_from, p)
# stream for 1 sec
trion.start_soon(cancel, use_signal, 1)
last_p = pts[-1]
async for value in await last_p.result():
print(value)
else: else:
raise KeyboardInterrupt await cancel(use_signal)
finally: finally:
# all subactors should have de-registered with trio.CancelScope(shield=True):
await trio.sleep(0.5) await trio.sleep(0.5)
# all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert len(registry) == extra assert len(registry) == extra
assert actor.uid in registry assert actor.uid in registry
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel( def test_subactors_unregister_on_cancel(
start_method,
use_signal,
arb_addr
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(spawn_and_check_registry, arb_addr, use_signal)
@pytest.mark.parametrize('use_signal', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method, start_method,
use_signal, use_signal,
arb_addr, arb_addr,
with_streaming,
): ):
"""Verify that cancelling a nursery results in all subactors """Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter. deregistering themselves with the arbiter.
""" """
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
False,
with_streaming,
arbiter_addr=arb_addr
)
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
arb_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local process
tree) arbiter.
"""
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( tractor.run(
spawn_and_check_registry, spawn_and_check_registry,
arb_addr, arb_addr,
use_signal, use_signal,
True, True,
with_streaming,
# XXX: required to use remote daemon! # XXX: required to use remote daemon!
arbiter_addr=arb_addr arbiter_addr=arb_addr
) )