Add stale entry deleted from registrar test

By spawning an actor task that immediately shuts down the transport
server and then sleeps, verify that attempting to connect via the
`._discovery.find_actor()` helper delivers `None` for the `Portal`
value.

Relates to #184 and #216
multicast_revertable_streams
Tyler Goodlet 2023-08-28 12:20:12 -04:00 committed by goodboy
parent 0dfa6f4a8a
commit 528012f35f
1 changed files with 83 additions and 17 deletions

View File

@ -1,7 +1,7 @@
"""
Discovery subsys.
'''
Discovery subsystem via a "registrar" actor scenarios.
"""
'''
import os
import signal
import platform
@ -163,7 +163,10 @@ async def unpack_reg(
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()}
return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry(
@ -356,20 +359,24 @@ async def close_chans_before_nursery(
try:
get_reg = partial(unpack_reg, aportal)
async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])
async with tractor.open_nursery() as an:
portal1 = await an.start_actor(
name='consumer1',
enable_modules=[__name__],
)
portal2 = await an.start_actor(
'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
async with (
portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
) as agen1,
portal2.open_stream_from(
stream_forever
) as agen2:
) as agen2,
):
async with (
collapse_eg(),
trio.open_nursery() as tn,
@ -392,6 +399,7 @@ async def close_chans_before_nursery(
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(1)
@ -427,7 +435,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
def test_close_channel_explicit_remote_registrar(
daemon: subprocess.Popen,
start_method: str,
use_signal: bool,
@ -448,3 +456,61 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True,
),
)
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.ipc_server.cancel()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
debug_mode: bool,
daemon: subprocess.Popen,
start_method: str,
reg_addr: tuple,
):
'''
Ensure that when a stale entry is detected in the registrar's
table that the `find_actor()` API takes care of deleting the
stale entry and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal
an: tractor.ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
) as an,
tractor.get_registry(reg_addr) as _reg_ptl,
):
ptl: tractor.Portal = await an.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
# because the transitive
# `._discovery.maybe_open_portal()` call should
# fail and implicitly call `.delete_sockaddr()`
assert maybe_portal is None
registry: dict = await unpack_reg(_reg_ptl)
assert ptl.chan.aid.uid not in registry
# should fail since we knocked out the IPC tpt XD
await ptl.cancel_actor()
await an.cancel()
trio.run(main)