Handle stale registrar entries; detect and delete

In cases where an actor's transport server task (by default handling new
TCP connections) terminates early but does not de-register from the
pertaining registry (aka the registrar) actor's address table, the
trying-to-connect client actor will get a connection error on that
address. In the case where client handles a (local) `OSError` (meaning
the target actor address is likely being contacted over `localhost`)
exception, make a further call to the registrar to delete the stale
entry and `yield None` gracefully indicating to calling code that no
`Portal` can be delivered to the target address.

This issue was originally discovered in `piker` where the `emsd`
(clearing engine) actor would sometimes crash on rapid client
re-connects and then leave a `pikerd` stale entry. With this fix new
clients will attempt connect via an endpoint which will re-spawn the
`emsd` when a `None` portal is delivered (via `maybe_spawn_em()`).
dereg_on_oserror
Tyler Goodlet 2023-08-28 11:26:36 -04:00
parent 9f5b112d19
commit c089c8d0d6
1 changed files with 37 additions and 15 deletions

View File

@ -60,7 +60,7 @@ log = get_logger(__name__)
async def get_registry( async def get_registry(
addr: UnwrappedAddress|None = None, addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
Portal | LocalPortal | None, Portal|LocalPortal|None,
None, None,
]: ]:
''' '''
@ -150,7 +150,7 @@ async def query_actor(
regaddr: UnwrappedAddress|None = None, regaddr: UnwrappedAddress|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
UnwrappedAddress|None, tuple[UnwrappedAddress|None, Portal|None],
None, None,
]: ]:
''' '''
@ -164,7 +164,8 @@ async def query_actor(
actor: Actor = current_actor() actor: Actor = current_actor()
if ( if (
name == 'registrar' name == 'registrar'
and actor.is_registrar and
actor.is_registrar
): ):
raise RuntimeError( raise RuntimeError(
'The current actor IS the registry!?' 'The current actor IS the registry!?'
@ -172,7 +173,7 @@ async def query_actor(
maybe_peers: list[Channel]|None = get_peer_by_name(name) maybe_peers: list[Channel]|None = get_peer_by_name(name)
if maybe_peers: if maybe_peers:
yield maybe_peers[0].raddr yield maybe_peers[0].raddr, None
return return
reg_portal: Portal reg_portal: Portal
@ -185,8 +186,7 @@ async def query_actor(
'find_actor', 'find_actor',
name=name, name=name,
) )
yield addr yield addr, reg_portal
@acm @acm
async def maybe_open_portal( async def maybe_open_portal(
@ -196,15 +196,36 @@ async def maybe_open_portal(
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,
) as addr: ) as (addr, reg_portal):
pass if not addr:
yield None
return
if addr: try:
async with _connect_chan(addr) as chan: async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
else:
yield None # most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await reg_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=addr.unwrap(),
)
log.error(
f'Deleted stale registry entry !\n'
f'addr: {addr!r}\n'
f'uid: {uid!r}\n'
)
@acm @acm
@ -272,7 +293,7 @@ async def find_actor(
if not any(portals): if not any(portals):
if raise_on_none: if raise_on_none:
raise RuntimeError( raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}' f'No actor {name!r} found registered @ {registry_addrs!r}'
) )
yield None yield None
return return
@ -288,6 +309,7 @@ async def find_actor(
yield portals yield portals
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,