Handle stale registrar entries; detect and delete

In cases where an actor's transport server task (by default handling new
TCP connections) terminates early but does not de-register from the
pertaining registry (aka the registrar) actor's address table, the
trying-to-connect client actor will get a connection error on that
address. In the case where client handles a (local) `OSError` (meaning
the target actor address is likely being contacted over `localhost`)
exception, make a further call to the registrar to delete the stale
entry and `yield None` gracefully indicating to calling code that no
`Portal` can be delivered to the target address.

This issue was originally discovered in `piker` where the `emsd`
(clearing engine) actor would sometimes crash on rapid client
re-connects and then leave a `pikerd` stale entry. With this fix new
clients will attempt connect via an endpoint which will re-spawn the
`emsd` when a `None` portal is delivered (via `maybe_spawn_em()`).
dereg_on_oserror
Tyler Goodlet 2023-08-28 11:26:36 -04:00
parent 1cf712cfac
commit d83d991f21
2 changed files with 55 additions and 15 deletions

View File

@ -31,6 +31,7 @@ from ._streaming import (
) )
from ._discovery import ( from ._discovery import (
get_arbiter, get_arbiter,
get_registrar,
find_actor, find_actor,
wait_for_actor, wait_for_actor,
query_actor, query_actor,
@ -77,6 +78,7 @@ __all__ = [
'find_actor', 'find_actor',
'query_actor', 'query_actor',
'get_arbiter', 'get_arbiter',
'get_registrar',
'is_root_process', 'is_root_process',
'msg', 'msg',
'open_actor_cluster', 'open_actor_cluster',

View File

@ -35,7 +35,7 @@ from ._state import current_actor, _runtime_vars
@acm @acm
async def get_arbiter( async def get_registrar(
host: str, host: str,
port: int, port: int,
@ -56,13 +56,16 @@ async def get_arbiter(
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port))) yield LocalPortal(actor, Channel((host, port)))
else: else:
async with _connect_chan(host, port) as chan: async with (
_connect_chan(host, port) as chan,
async with open_portal(chan) as arb_portal: open_portal(chan) as arb_portal,
):
yield arb_portal yield arb_portal
get_arbiter = get_registrar
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -101,7 +104,10 @@ async def query_actor(
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None yield sockaddr if sockaddr else None
@ -112,7 +118,7 @@ async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Optional[Portal], None]: ) -> AsyncGenerator[Portal | None, None]:
''' '''
Ask the arbiter to find actor(s) by name. Ask the arbiter to find actor(s) by name.
@ -120,16 +126,48 @@ async def find_actor(
known to the arbiter. known to the arbiter.
''' '''
async with query_actor( actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name, name=name,
arbiter_sockaddr=arbiter_sockaddr, )
) as sockaddr:
# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")
if sockaddr: if sockaddr:
try:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
else: return
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await arb_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=sockaddr,
)
yield None yield None