Merge pull request #366 from goodboy/dereg_on_oserror

Make `find_actor()` delete stale sockaddr entries from registrar on `OSError`
multicast_revertable_streams
Bd 2026-03-25 03:27:27 -04:00 committed by GitHub
commit cc3bfac741
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 180 additions and 39 deletions

View File

@ -1,7 +1,7 @@
""" '''
Discovery subsys. Discovery subsystem via a "registrar" actor scenarios.
""" '''
import os import os
import signal import signal
import platform import platform
@ -163,7 +163,10 @@ async def unpack_reg(
else: else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry') msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()} return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -356,20 +359,24 @@ async def close_chans_before_nursery(
try: try:
get_reg = partial(unpack_reg, aportal) get_reg = partial(unpack_reg, aportal)
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as an:
portal1 = await tn.start_actor( portal1 = await an.start_actor(
name='consumer1', enable_modules=[__name__]) name='consumer1',
portal2 = await tn.start_actor( enable_modules=[__name__],
'consumer2', enable_modules=[__name__]) )
portal2 = await an.start_actor(
'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once async with (
# 3.9+, see https://github.com/goodboy/tractor/issues/207 portal1.open_stream_from(
async with portal1.open_stream_from(
stream_forever stream_forever
) as agen1: ) as agen1,
async with portal2.open_stream_from( portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2,
):
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -392,6 +399,7 @@ async def close_chans_before_nursery(
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()
await agen2.aclose() await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(1) await trio.sleep(1)
@ -427,7 +435,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter( def test_close_channel_explicit_remote_registrar(
daemon: subprocess.Popen, daemon: subprocess.Popen,
start_method: str, start_method: str,
use_signal: bool, use_signal: bool,
@ -448,3 +456,65 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True, remote_arbiter=True,
), ),
) )
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.ipc_server.cancel()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
debug_mode: bool,
daemon: subprocess.Popen,
start_method: str,
reg_addr: tuple,
):
'''
Ensure that when a stale entry is detected in the registrar's
table that the `find_actor()` API takes care of deleting the
stale entry and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
_reg_ptl: tractor.Portal
an: tractor.ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
tractor.get_registry(reg_addr) as _reg_ptl,
):
ptl: tractor.Portal = await an.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(
name,
registry_addrs=[reg_addr],
) as maybe_portal:
# because the transitive
# `._discovery.maybe_open_portal()` call should
# fail and implicitly call `.delete_addr()`
assert maybe_portal is None
registry: dict = await unpack_reg(_reg_ptl)
assert ptl.chan.aid.uid not in registry
# should fail since we knocked out the IPC tpt XD
await ptl.cancel_actor()
await an.cancel()
trio.run(main)

View File

@ -60,7 +60,7 @@ log = get_logger()
async def get_registry( async def get_registry(
addr: UnwrappedAddress|None = None, addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
Portal | LocalPortal | None, Portal|LocalPortal|None,
None, None,
]: ]:
''' '''
@ -153,21 +153,27 @@ async def query_actor(
regaddr: UnwrappedAddress|None = None, regaddr: UnwrappedAddress|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
UnwrappedAddress|None, tuple[UnwrappedAddress|None, Portal|LocalPortal|None],
None, None,
]: ]:
''' '''
Lookup a transport address (by actor name) via querying a registrar Lookup a transport address (by actor name) via querying a registrar
listening @ `regaddr`. listening @ `regaddr`.
Returns the transport protocol (socket) address or `None` if no Yields a `tuple` of `(addr, reg_portal)` where,
entry under that name exists. - `addr` is the transport protocol (socket) address or `None` if
no entry under that name exists,
- `reg_portal` is the `Portal` (or `LocalPortal` when the
current actor is the registrar) used for the lookup (or
`None` when the peer was found locally via
`get_peer_by_name()`).
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
if ( if (
name == 'registrar' name == 'registrar'
and actor.is_registrar and
actor.is_registrar
): ):
raise RuntimeError( raise RuntimeError(
'The current actor IS the registry!?' 'The current actor IS the registry!?'
@ -175,10 +181,10 @@ async def query_actor(
maybe_peers: list[Channel]|None = get_peer_by_name(name) maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers: if maybe_peers:
yield maybe_peers[0].raddr yield maybe_peers[0].raddr, None
return return
reg_portal: Portal reg_portal: Portal|LocalPortal
regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0] regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0]
async with get_registry(regaddr) as reg_portal: async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now # TODO: return portals to all available actors - for now
@ -188,8 +194,7 @@ async def query_actor(
'find_actor', 'find_actor',
name=name, name=name,
) )
yield addr yield addr, reg_portal
@acm @acm
async def maybe_open_portal( async def maybe_open_portal(
@ -204,14 +209,48 @@ async def maybe_open_portal(
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,
) as addr: ) as (addr, reg_portal):
pass if not addr:
yield None
return
if addr: try:
async with _connect_chan(addr) as chan: async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry
# from the registrar actor when available.
if reg_portal is not None:
uid: tuple[str, str]|None = await reg_portal.run_from_ns(
'self',
'delete_addr',
addr=addr,
)
if uid:
log.warning(
f'Deleted stale registry entry !\n'
f'addr: {addr!r}\n'
f'uid: {uid!r}\n'
)
else: else:
log.warning(
f'No registry entry found for addr: {addr!r}'
)
else:
log.warning(
f'Connection to {addr!r} failed'
f' and no registry portal available'
f' to delete stale entry.'
)
yield None yield None
@ -280,7 +319,7 @@ async def find_actor(
if not any(portals): if not any(portals):
if raise_on_none: if raise_on_none:
raise RuntimeError( raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}' f'No actor {name!r} found registered @ {registry_addrs!r}'
) )
yield None yield None
return return

View File

@ -68,6 +68,7 @@ import textwrap
from types import ModuleType from types import ModuleType
import warnings import warnings
from bidict import bidict
import trio import trio
from trio._core import _run as trio_runtime from trio._core import _run as trio_runtime
from trio import ( from trio import (
@ -1920,10 +1921,10 @@ class Arbiter(Actor):
**kwargs, **kwargs,
) -> None: ) -> None:
self._registry: dict[ self._registry: bidict[
tuple[str, str], tuple[str, str],
UnwrappedAddress, UnwrappedAddress,
] = {} ] = bidict({})
self._waiters: dict[ self._waiters: dict[
str, str,
# either an event to sync to receiving an actor uid (which # either an event to sync to receiving an actor uid (which
@ -2012,7 +2013,13 @@ class Arbiter(Actor):
# should never be 0-dynamic-os-alloc # should never be 0-dynamic-os-alloc
await debug.pause() await debug.pause()
self._registry[uid] = addr # XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique; use
# `.forceput()` to replace any prior (stale) entries
# that might map a different uid to the same addr (e.g.
# after an unclean shutdown or actor-restart reusing
# the same address).
self._registry.forceput(uid, tuple(addr))
# pop and signal all waiter events # pop and signal all waiter events
events = self._waiters.pop(name, []) events = self._waiters.pop(name, [])
@ -2029,4 +2036,29 @@ class Arbiter(Actor):
uid = (str(uid[0]), str(uid[1])) uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None) entry: tuple = self._registry.pop(uid, None)
if entry is None: if entry is None:
log.warning(f'Request to de-register {uid} failed?') log.warning(
f'Request to de-register {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str]|list[str|int],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str]|None = self._registry.inverse.pop(
tuple(addr),
None,
)
if uid:
report: str = 'Deleting registry-entry for,\n'
else:
report: str = 'No registry entry for,\n'
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid