From 0c2fb98d5eb9247cae0db0f7b6800e645c717a60 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Aug 2023 12:20:12 -0400 Subject: [PATCH] Add stale entry deleted from registrar test By spawning an actor task that immediately shuts down the transport server and then sleeps, verify that attempting to connect via the `._discovery.find_actor()` helper delivers `None` for the `Portal` value. Relates to #184 and #216 --- tests/test_discovery.py | 113 ++++++++++++++++++++++++++++++++-------- 1 file changed, 92 insertions(+), 21 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 453b1aa3..7154f1d1 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,7 @@ -""" -Actor "discovery" testing -""" +''' +Discovery subsystem via a "registrar" actor scenarios. + +''' import os import signal import platform @@ -149,7 +150,10 @@ async def unpack_reg(actor_or_portal): else: msg = await actor_or_portal.run_from_ns('self', 'get_registry') - return {tuple(key.split('.')): val for key, val in msg.items()} + return { + tuple(key.split('.')): val + for key, val in msg.items() + } async def spawn_and_check_registry( @@ -325,20 +329,24 @@ async def close_chans_before_nursery( try: get_reg = partial(unpack_reg, aportal) - async with tractor.open_nursery() as tn: - portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) - portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) + async with tractor.open_nursery() as an: + portal1 = await an.start_actor( + name='consumer1', + enable_modules=[__name__], + ) + portal2 = await an.start_actor( + 'consumer2', + enable_modules=[__name__], + ) - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from( - stream_forever - ) as agen1: - async with portal2.open_stream_from( + async with ( + portal1.open_stream_from( stream_forever - ) as agen2: + ) as agen1, + portal2.open_stream_from( + stream_forever + ) as agen2, + ): async with ( collapse_eg(), trio.open_nursery() as tn, @@ -361,6 +369,7 @@ async def close_chans_before_nursery( # also kill off channels cuz why not await agen1.aclose() await agen2.aclose() + finally: with trio.CancelScope(shield=True): await trio.sleep(1) @@ -378,10 +387,12 @@ def test_close_channel_explicit( use_signal, reg_addr, ): - """Verify that closing a stream explicitly and killing the actor's + ''' + Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( @@ -394,16 +405,18 @@ def test_close_channel_explicit( @pytest.mark.parametrize('use_signal', [False, True]) -def test_close_channel_explicit_remote_arbiter( +def test_close_channel_explicit_remote_registrar( daemon: subprocess.Popen, start_method, use_signal, reg_addr, ): - """Verify that closing a stream explicitly and killing the actor's + ''' + Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( @@ -413,3 +426,61 @@ def test_close_channel_explicit_remote_arbiter( remote_arbiter=True, ), ) + + +@tractor.context +async def kill_transport( + ctx: tractor.Context, +) -> None: + + await ctx.started() + actor: tractor.Actor = tractor.current_actor() + actor.ipc_server.cancel() + await trio.sleep_forever() + + + +# @pytest.mark.parametrize('use_signal', [False, True]) +def test_stale_entry_is_deleted( + debug_mode: bool, + daemon: subprocess.Popen, + start_method: str, + reg_addr: tuple, +): + ''' + Ensure that when a stale entry is detected in the registrar's + table that the `find_actor()` API takes care of deleting the + stale entry and not delivering a bad portal. + + ''' + async def main(): + + name: str = 'transport_fails_actor' + _reg_ptl: tractor.Portal + an: tractor.ActorNursery + async with ( + tractor.open_nursery( + debug_mode=debug_mode, + ) as an, + tractor.get_registry(reg_addr) as _reg_ptl, + ): + ptl: tractor.Portal = await an.start_actor( + name, + enable_modules=[__name__], + ) + async with ptl.open_context( + kill_transport, + ) as (first, ctx): + async with tractor.find_actor(name) as maybe_portal: + # because the transitive + # `._discovery.maybe_open_portal()` call should + # fail and implicitly call `.delete_sockaddr()` + assert maybe_portal is None + registry: dict = await unpack_reg(_reg_ptl) + assert ptl.chan.aid.uid not in registry + + # should fail since we knocked out the IPC tpt XD + await ptl.cancel_actor() + await an.cancel() + + trio.run(main)