Add stale entry deleted from registrar test
By spawning an actor task that immediately shuts down the transport server and then sleeps, verify that attempting to connect via the `._discovery.find_actor()` helper delivers `None` for the `Portal` value. Relates to #184 and #216dereg_on_oserror
parent
8b82808fcd
commit
0c2fb98d5e
|
@ -1,6 +1,7 @@
|
||||||
"""
|
'''
|
||||||
Actor "discovery" testing
|
Discovery subsystem via a "registrar" actor scenarios.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import platform
|
import platform
|
||||||
|
@ -149,7 +150,10 @@ async def unpack_reg(actor_or_portal):
|
||||||
else:
|
else:
|
||||||
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
||||||
|
|
||||||
return {tuple(key.split('.')): val for key, val in msg.items()}
|
return {
|
||||||
|
tuple(key.split('.')): val
|
||||||
|
for key, val in msg.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def spawn_and_check_registry(
|
async def spawn_and_check_registry(
|
||||||
|
@ -325,20 +329,24 @@ async def close_chans_before_nursery(
|
||||||
try:
|
try:
|
||||||
get_reg = partial(unpack_reg, aportal)
|
get_reg = partial(unpack_reg, aportal)
|
||||||
|
|
||||||
async with tractor.open_nursery() as tn:
|
async with tractor.open_nursery() as an:
|
||||||
portal1 = await tn.start_actor(
|
portal1 = await an.start_actor(
|
||||||
name='consumer1', enable_modules=[__name__])
|
name='consumer1',
|
||||||
portal2 = await tn.start_actor(
|
enable_modules=[__name__],
|
||||||
'consumer2', enable_modules=[__name__])
|
)
|
||||||
|
portal2 = await an.start_actor(
|
||||||
|
'consumer2',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: compact this back as was in last commit once
|
async with (
|
||||||
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
portal1.open_stream_from(
|
||||||
async with portal1.open_stream_from(
|
|
||||||
stream_forever
|
|
||||||
) as agen1:
|
|
||||||
async with portal2.open_stream_from(
|
|
||||||
stream_forever
|
stream_forever
|
||||||
) as agen2:
|
) as agen1,
|
||||||
|
portal2.open_stream_from(
|
||||||
|
stream_forever
|
||||||
|
) as agen2,
|
||||||
|
):
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(),
|
collapse_eg(),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
|
@ -361,6 +369,7 @@ async def close_chans_before_nursery(
|
||||||
# also kill off channels cuz why not
|
# also kill off channels cuz why not
|
||||||
await agen1.aclose()
|
await agen1.aclose()
|
||||||
await agen2.aclose()
|
await agen2.aclose()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
@ -378,10 +387,12 @@ def test_close_channel_explicit(
|
||||||
use_signal,
|
use_signal,
|
||||||
reg_addr,
|
reg_addr,
|
||||||
):
|
):
|
||||||
"""Verify that closing a stream explicitly and killing the actor's
|
'''
|
||||||
|
Verify that closing a stream explicitly and killing the actor's
|
||||||
"root nursery" **before** the containing nursery tears down also
|
"root nursery" **before** the containing nursery tears down also
|
||||||
results in subactor(s) deregistering from the arbiter.
|
results in subactor(s) deregistering from the arbiter.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(
|
trio.run(
|
||||||
partial(
|
partial(
|
||||||
|
@ -394,16 +405,18 @@ def test_close_channel_explicit(
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('use_signal', [False, True])
|
@pytest.mark.parametrize('use_signal', [False, True])
|
||||||
def test_close_channel_explicit_remote_arbiter(
|
def test_close_channel_explicit_remote_registrar(
|
||||||
daemon: subprocess.Popen,
|
daemon: subprocess.Popen,
|
||||||
start_method,
|
start_method,
|
||||||
use_signal,
|
use_signal,
|
||||||
reg_addr,
|
reg_addr,
|
||||||
):
|
):
|
||||||
"""Verify that closing a stream explicitly and killing the actor's
|
'''
|
||||||
|
Verify that closing a stream explicitly and killing the actor's
|
||||||
"root nursery" **before** the containing nursery tears down also
|
"root nursery" **before** the containing nursery tears down also
|
||||||
results in subactor(s) deregistering from the arbiter.
|
results in subactor(s) deregistering from the arbiter.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(
|
trio.run(
|
||||||
partial(
|
partial(
|
||||||
|
@ -413,3 +426,61 @@ def test_close_channel_explicit_remote_arbiter(
|
||||||
remote_arbiter=True,
|
remote_arbiter=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def kill_transport(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
await ctx.started()
|
||||||
|
actor: tractor.Actor = tractor.current_actor()
|
||||||
|
actor.ipc_server.cancel()
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# @pytest.mark.parametrize('use_signal', [False, True])
|
||||||
|
def test_stale_entry_is_deleted(
|
||||||
|
debug_mode: bool,
|
||||||
|
daemon: subprocess.Popen,
|
||||||
|
start_method: str,
|
||||||
|
reg_addr: tuple,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Ensure that when a stale entry is detected in the registrar's
|
||||||
|
table that the `find_actor()` API takes care of deleting the
|
||||||
|
stale entry and not delivering a bad portal.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
name: str = 'transport_fails_actor'
|
||||||
|
_reg_ptl: tractor.Portal
|
||||||
|
an: tractor.ActorNursery
|
||||||
|
async with (
|
||||||
|
tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as an,
|
||||||
|
tractor.get_registry(reg_addr) as _reg_ptl,
|
||||||
|
):
|
||||||
|
ptl: tractor.Portal = await an.start_actor(
|
||||||
|
name,
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
async with ptl.open_context(
|
||||||
|
kill_transport,
|
||||||
|
) as (first, ctx):
|
||||||
|
async with tractor.find_actor(name) as maybe_portal:
|
||||||
|
# because the transitive
|
||||||
|
# `._discovery.maybe_open_portal()` call should
|
||||||
|
# fail and implicitly call `.delete_sockaddr()`
|
||||||
|
assert maybe_portal is None
|
||||||
|
registry: dict = await unpack_reg(_reg_ptl)
|
||||||
|
assert ptl.chan.aid.uid not in registry
|
||||||
|
|
||||||
|
# should fail since we knocked out the IPC tpt XD
|
||||||
|
await ptl.cancel_actor()
|
||||||
|
await an.cancel()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue