diff --git a/tests/conftest.py b/tests/conftest.py index 3363cf5..c14cd0d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -219,7 +219,8 @@ def daemon( arb_addr: tuple[str, int], ): ''' - Run a daemon actor as a "remote arbiter". + Run a daemon actor as a "remote registrar" and/or plain ol + separate actor (service) tree. ''' if loglevel in ('trace', 'debug'): diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 8ba4ebe..95cd985 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,7 @@ -""" -Actor "discovery" testing -""" +''' +Discovery subsystem via a "registrar" actor scenarios. + +''' import os import signal import platform @@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal): else: msg = await actor_or_portal.run_from_ns('self', 'get_registry') - return {tuple(key.split('.')): val for key, val in msg.items()} + return { + tuple(key.split('.')): val + for key, val in msg.items() + } async def spawn_and_check_registry( @@ -283,37 +287,41 @@ async def close_chans_before_nursery( async with tractor.open_nursery() as tn: portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) + name='consumer1', + enable_modules=[__name__], + ) portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) + 'consumer2', + enable_modules=[__name__], + ) - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from( - stream_forever - ) as agen1: - async with portal2.open_stream_from( + async with ( + portal1.open_stream_from( stream_forever - ) as agen2: - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() + ) as agen1, + portal2.open_stream_from( + stream_forever + ) as agen2, + ): + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that - # happens **before** exiting the - # actor nursery block + # XXX: THIS IS THE KEY THING that + # happens **before** exiting the + # actor nursery block - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): await trio.sleep(1) @@ -331,10 +339,12 @@ def test_close_channel_explicit( use_signal, arb_addr, ): - """Verify that closing a stream explicitly and killing the actor's + ''' + Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( @@ -347,16 +357,18 @@ def test_close_channel_explicit( @pytest.mark.parametrize('use_signal', [False, True]) -def test_close_channel_explicit_remote_arbiter( +def test_close_channel_explicit_remote_registrar( daemon, start_method, use_signal, arb_addr, ): - """Verify that closing a stream explicitly and killing the actor's + ''' + Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( @@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter( remote_arbiter=True, ), ) + + +@tractor.context +async def kill_transport( + ctx: tractor.Context, +) -> None: + + await ctx.started() + actor: tractor.Actor = tractor.current_actor() + actor.cancel_server() + await trio.sleep_forever() + + + +# @pytest.mark.parametrize('use_signal', [False, True]) +def test_stale_entry_is_deleted( + daemon, + start_method, + arb_addr, +): + ''' + Ensure that when a stale entry is detected in the registrar's table + that the `find_actor()` API takes care of deleting the stale entry + and not delivering a bad portal. + + ''' + async def main(): + + name: str = 'transport_fails_actor' + regport: tractor.Portal + tn: tractor.ActorNursery + async with ( + tractor.open_nursery() as tn, + tractor.get_registrar(*arb_addr) as regport, + ): + ptl: tractor.Portal = await tn.start_actor( + name, + enable_modules=[__name__], + ) + async with ptl.open_context( + kill_transport, + ) as (first, ctx): + async with tractor.find_actor(name) as maybe_portal: + assert maybe_portal is None + + await ptl.cancel_actor() + + trio.run(main)