forked from goodboy/tractor
1
0
Fork 0

Add stale entry deleted from registrar test

By spawning an actor task that immediately shuts down the transport
server and then sleeps, verify that attempting to connect via the
`._discovery.find_actor()` helper delivers `None` for the `Portal`
value.

Relates to #184 and #216
dereg_on_oserror
Tyler Goodlet 2023-08-28 12:20:12 -04:00
parent d83d991f21
commit 687852f368
2 changed files with 97 additions and 36 deletions

View File

@ -219,7 +219,8 @@ def daemon(
arb_addr: tuple[str, int], arb_addr: tuple[str, int],
): ):
''' '''
Run a daemon actor as a "remote arbiter". Run a daemon actor as a "remote registrar" and/or plain ol
separate actor (service) tree.
''' '''
if loglevel in ('trace', 'debug'): if loglevel in ('trace', 'debug'):

View File

@ -1,6 +1,7 @@
""" '''
Actor "discovery" testing Discovery subsystem via a "registrar" actor scenarios.
"""
'''
import os import os
import signal import signal
import platform import platform
@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal):
else: else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry') msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()} return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -283,37 +287,41 @@ async def close_chans_before_nursery(
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor( portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__]) name='consumer1',
enable_modules=[__name__],
)
portal2 = await tn.start_actor( portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__]) 'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once async with (
# 3.9+, see https://github.com/goodboy/tractor/issues/207 portal1.open_stream_from(
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen1,
async with trio.open_nursery() as n: portal2.open_stream_from(
n.start_soon(streamer, agen1) stream_forever
n.start_soon(cancel, use_signal, .5) ) as agen2,
try: ):
await streamer(agen2) async with trio.open_nursery() as n:
finally: n.start_soon(streamer, agen1)
# Kill the root nursery thus resulting in n.start_soon(cancel, use_signal, .5)
# normal arbiter channel ops to fail during try:
# teardown. It doesn't seem like this is await streamer(agen2)
# reliably triggered by an external SIGINT. finally:
# tractor.current_actor()._root_nursery.cancel_scope.cancel() # Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that # XXX: THIS IS THE KEY THING that
# happens **before** exiting the # happens **before** exiting the
# actor nursery block # actor nursery block
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()
await agen2.aclose() await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(1) await trio.sleep(1)
@ -331,10 +339,12 @@ def test_close_channel_explicit(
use_signal, use_signal,
arb_addr, arb_addr,
): ):
"""Verify that closing a stream explicitly and killing the actor's '''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -347,16 +357,18 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter( def test_close_channel_explicit_remote_registrar(
daemon, daemon,
start_method, start_method,
use_signal, use_signal,
arb_addr, arb_addr,
): ):
"""Verify that closing a stream explicitly and killing the actor's '''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True, remote_arbiter=True,
), ),
) )
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.cancel_server()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
daemon,
start_method,
arb_addr,
):
'''
Ensure that when a stale entry is detected in the registrar's table
that the `find_actor()` API takes care of deleting the stale entry
and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
regport: tractor.Portal
tn: tractor.ActorNursery
async with (
tractor.open_nursery() as tn,
tractor.get_registrar(*arb_addr) as regport,
):
ptl: tractor.Portal = await tn.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
assert maybe_portal is None
await ptl.cancel_actor()
trio.run(main)