From 528012f35f2e51b180fd3286ded1f1ecdf13009c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Aug 2023 12:20:12 -0400 Subject: [PATCH] Add stale entry deleted from registrar test By spawning an actor task that immediately shuts down the transport server and then sleeps, verify that attempting to connect via the `._discovery.find_actor()` helper delivers `None` for the `Portal` value. Relates to #184 and #216 --- tests/test_discovery.py | 100 +++++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index b76e3706..6050011e 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,7 +1,7 @@ -""" -Discovery subsys. +''' +Discovery subsystem via a "registrar" actor scenarios. -""" +''' import os import signal import platform @@ -163,7 +163,10 @@ async def unpack_reg( else: msg = await actor_or_portal.run_from_ns('self', 'get_registry') - return {tuple(key.split('.')): val for key, val in msg.items()} + return { + tuple(key.split('.')): val + for key, val in msg.items() + } async def spawn_and_check_registry( @@ -356,20 +359,24 @@ async def close_chans_before_nursery( try: get_reg = partial(unpack_reg, aportal) - async with tractor.open_nursery() as tn: - portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) - portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) + async with tractor.open_nursery() as an: + portal1 = await an.start_actor( + name='consumer1', + enable_modules=[__name__], + ) + portal2 = await an.start_actor( + 'consumer2', + enable_modules=[__name__], + ) - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from( - stream_forever - ) as agen1: - async with portal2.open_stream_from( + async with ( + portal1.open_stream_from( stream_forever - ) as agen2: + ) as agen1, + portal2.open_stream_from( + stream_forever + ) as agen2, + ): async with ( collapse_eg(), trio.open_nursery() as tn, @@ -392,6 +399,7 @@ async def close_chans_before_nursery( # also kill off channels cuz why not await agen1.aclose() await agen2.aclose() + finally: with trio.CancelScope(shield=True): await trio.sleep(1) @@ -427,7 +435,7 @@ def test_close_channel_explicit( @pytest.mark.parametrize('use_signal', [False, True]) -def test_close_channel_explicit_remote_arbiter( +def test_close_channel_explicit_remote_registrar( daemon: subprocess.Popen, start_method: str, use_signal: bool, @@ -448,3 +456,61 @@ def test_close_channel_explicit_remote_arbiter( remote_arbiter=True, ), ) + + +@tractor.context +async def kill_transport( + ctx: tractor.Context, +) -> None: + + await ctx.started() + actor: tractor.Actor = tractor.current_actor() + actor.ipc_server.cancel() + await trio.sleep_forever() + + + +# @pytest.mark.parametrize('use_signal', [False, True]) +def test_stale_entry_is_deleted( + debug_mode: bool, + daemon: subprocess.Popen, + start_method: str, + reg_addr: tuple, +): + ''' + Ensure that when a stale entry is detected in the registrar's + table that the `find_actor()` API takes care of deleting the + stale entry and not delivering a bad portal. + + ''' + async def main(): + + name: str = 'transport_fails_actor' + _reg_ptl: tractor.Portal + an: tractor.ActorNursery + async with ( + tractor.open_nursery( + debug_mode=debug_mode, + ) as an, + tractor.get_registry(reg_addr) as _reg_ptl, + ): + ptl: tractor.Portal = await an.start_actor( + name, + enable_modules=[__name__], + ) + async with ptl.open_context( + kill_transport, + ) as (first, ctx): + async with tractor.find_actor(name) as maybe_portal: + # because the transitive + # `._discovery.maybe_open_portal()` call should + # fail and implicitly call `.delete_sockaddr()` + assert maybe_portal is None + registry: dict = await unpack_reg(_reg_ptl) + assert ptl.chan.aid.uid not in registry + + # should fail since we knocked out the IPC tpt XD + await ptl.cancel_actor() + await an.cancel() + + trio.run(main)