Compare commits

..

7 Commits

Author SHA1 Message Date
Tyler Goodlet c9fda3ff2c Rename `.delete_sockaddr()` -> `.delete_addr()` 2025-09-30 01:09:16 -04:00
Tyler Goodlet 11acdf8625 Always no-raise try-to-pop registry addrs 2025-09-29 23:14:25 -04:00
Tyler Goodlet 0c2fb98d5e Add stale entry deleted from registrar test
By spawning an actor task that immediately shuts down the transport
server and then sleeps, verify that attempting to connect via the
`._discovery.find_actor()` helper delivers `None` for the `Portal`
value.

Relates to #184 and #216
2025-09-29 23:10:36 -04:00
Tyler Goodlet 8b82808fcd Don't unwrap and unwrapped addr, just warn on delete XD 2025-09-29 23:10:27 -04:00
Tyler Goodlet c93fbcc9f7 Ensure `._registry` values are hashable, since `bidict`! 2025-09-29 23:09:50 -04:00
Tyler Goodlet c089c8d0d6 Handle stale registrar entries; detect and delete
In cases where an actor's transport server task (by default handling new
TCP connections) terminates early but does not de-register from the
pertaining registry (aka the registrar) actor's address table, the
trying-to-connect client actor will get a connection error on that
address. In the case where client handles a (local) `OSError` (meaning
the target actor address is likely being contacted over `localhost`)
exception, make a further call to the registrar to delete the stale
entry and `yield None` gracefully indicating to calling code that no
`Portal` can be delivered to the target address.

This issue was originally discovered in `piker` where the `emsd`
(clearing engine) actor would sometimes crash on rapid client
re-connects and then leave a `pikerd` stale entry. With this fix new
clients will attempt connect via an endpoint which will re-spawn the
`emsd` when a `None` portal is delivered (via `maybe_spawn_em()`).
2025-09-29 21:56:57 -04:00
Tyler Goodlet 9f5b112d19 Add `Arbiter.delete_sockaddr()` to remove addrs
Since stale addrs can be leaked where the actor transport server task
crashes but doesn't (successfully) unregister from the registrar, we
need a remote way to remove such entries; hence this new (registrar)
method.

To implement this make use of the `bidict` lib for the `._registry`
table thus making it super simple to do reverse uuid lookups from an
input socket-address.
2025-09-29 20:34:25 -04:00
3 changed files with 157 additions and 40 deletions

View File

@ -1,6 +1,7 @@
"""
Actor "discovery" testing
"""
'''
Discovery subsystem via a "registrar" actor scenarios.
'''
import os
import signal
import platform
@ -149,7 +150,10 @@ async def unpack_reg(actor_or_portal):
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()}
return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry(
@ -325,20 +329,24 @@ async def close_chans_before_nursery(
try:
get_reg = partial(unpack_reg, aportal)
async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])
async with tractor.open_nursery() as an:
portal1 = await an.start_actor(
name='consumer1',
enable_modules=[__name__],
)
portal2 = await an.start_actor(
'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
async with (
portal1.open_stream_from(
stream_forever
) as agen2:
) as agen1,
portal2.open_stream_from(
stream_forever
) as agen2,
):
async with (
collapse_eg(),
trio.open_nursery() as tn,
@ -361,6 +369,7 @@ async def close_chans_before_nursery(
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(1)
@ -378,10 +387,12 @@ def test_close_channel_explicit(
use_signal,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -394,16 +405,18 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
def test_close_channel_explicit_remote_registrar(
daemon: subprocess.Popen,
start_method,
use_signal,
reg_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -413,3 +426,61 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True,
),
)
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.ipc_server.cancel()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
debug_mode: bool,
daemon: subprocess.Popen,
start_method: str,
reg_addr: tuple,
):
'''
Ensure that when a stale entry is detected in the registrar's
table that the `find_actor()` API takes care of deleting the
stale entry and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal
an: tractor.ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
) as an,
tractor.get_registry(reg_addr) as _reg_ptl,
):
ptl: tractor.Portal = await an.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
# because the transitive
# `._discovery.maybe_open_portal()` call should
# fail and implicitly call `.delete_addr()`
assert maybe_portal is None
registry: dict = await unpack_reg(_reg_ptl)
assert ptl.chan.aid.uid not in registry
# should fail since we knocked out the IPC tpt XD
await ptl.cancel_actor()
await an.cancel()
trio.run(main)

View File

@ -60,7 +60,7 @@ log = get_logger(__name__)
async def get_registry(
addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
Portal | LocalPortal | None,
Portal|LocalPortal|None,
None,
]:
'''
@ -150,7 +150,7 @@ async def query_actor(
regaddr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
UnwrappedAddress|None,
tuple[UnwrappedAddress|None, Portal|None],
None,
]:
'''
@ -164,7 +164,8 @@ async def query_actor(
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
and
actor.is_registrar
):
raise RuntimeError(
'The current actor IS the registry!?'
@ -172,7 +173,7 @@ async def query_actor(
maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers:
yield maybe_peers[0].raddr
yield maybe_peers[0].raddr, None
return
reg_portal: Portal
@ -185,8 +186,7 @@ async def query_actor(
'find_actor',
name=name,
)
yield addr
yield addr, reg_portal
@acm
async def maybe_open_portal(
@ -196,15 +196,37 @@ async def maybe_open_portal(
async with query_actor(
name=name,
regaddr=addr,
) as addr:
pass
) as (addr, reg_portal):
if not addr:
yield None
return
if addr:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
try:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await reg_portal.run_from_ns(
'self',
'delete_addr',
addr=addr,
)
log.warning(
f'Deleted stale registry entry !\n'
f'addr: {addr!r}\n'
f'uid: {uid!r}\n'
)
yield None
@acm
@ -272,7 +294,7 @@ async def find_actor(
if not any(portals):
if raise_on_none:
raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}'
f'No actor {name!r} found registered @ {registry_addrs!r}'
)
yield None
return

View File

@ -68,6 +68,7 @@ import textwrap
from types import ModuleType
import warnings
from bidict import bidict
import trio
from trio._core import _run as trio_runtime
from trio import (
@ -1879,10 +1880,10 @@ class Arbiter(Actor):
**kwargs,
) -> None:
self._registry: dict[
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = {}
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
@ -1971,7 +1972,8 @@ class Arbiter(Actor):
# should never be 0-dynamic-os-alloc
await debug.pause()
self._registry[uid] = addr
# XXX NOTE, value must also be hashable.
self._registry[uid] = tuple(addr)
# pop and signal all waiter events
events = self._waiters.pop(name, [])
@ -1988,4 +1990,26 @@ class Arbiter(Actor):
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')
log.warning(
f'Request to de-register {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str],
) -> tuple[str, str]:
uid: tuple | None = self._registry.inverse.pop(
addr,
None,
)
if uid:
report: str = 'Deleting registry-entry for,\n'
else:
report: str = 'No registry entry for,\n'
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid