From c089c8d0d6914174f5cd74506e59f388d500a7a4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Aug 2023 11:26:36 -0400 Subject: [PATCH] Handle stale registrar entries; detect and delete In cases where an actor's transport server task (by default handling new TCP connections) terminates early but does not de-register from the pertaining registry (aka the registrar) actor's address table, the trying-to-connect client actor will get a connection error on that address. In the case where client handles a (local) `OSError` (meaning the target actor address is likely being contacted over `localhost`) exception, make a further call to the registrar to delete the stale entry and `yield None` gracefully indicating to calling code that no `Portal` can be delivered to the target address. This issue was originally discovered in `piker` where the `emsd` (clearing engine) actor would sometimes crash on rapid client re-connects and then leave a `pikerd` stale entry. With this fix new clients will attempt connect via an endpoint which will re-spawn the `emsd` when a `None` portal is delivered (via `maybe_spawn_em()`). --- tractor/_discovery.py | 52 ++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index a332ab73..401f6fa7 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -60,7 +60,7 @@ log = get_logger(__name__) async def get_registry( addr: UnwrappedAddress|None = None, ) -> AsyncGenerator[ - Portal | LocalPortal | None, + Portal|LocalPortal|None, None, ]: ''' @@ -150,7 +150,7 @@ async def query_actor( regaddr: UnwrappedAddress|None = None, ) -> AsyncGenerator[ - UnwrappedAddress|None, + tuple[UnwrappedAddress|None, Portal|None], None, ]: ''' @@ -164,7 +164,8 @@ async def query_actor( actor: Actor = current_actor() if ( name == 'registrar' - and actor.is_registrar + and + actor.is_registrar ): raise RuntimeError( 'The current actor IS the registry!?' @@ -172,7 +173,7 @@ async def query_actor( maybe_peers: list[Channel]|None = get_peer_by_name(name) if maybe_peers: - yield maybe_peers[0].raddr + yield maybe_peers[0].raddr, None return reg_portal: Portal @@ -185,8 +186,7 @@ async def query_actor( 'find_actor', name=name, ) - yield addr - + yield addr, reg_portal @acm async def maybe_open_portal( @@ -196,15 +196,36 @@ async def maybe_open_portal( async with query_actor( name=name, regaddr=addr, - ) as addr: - pass + ) as (addr, reg_portal): + if not addr: + yield None + return - if addr: - async with _connect_chan(addr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None + try: + async with _connect_chan(addr) as chan: + async with open_portal(chan) as portal: + yield portal + + # most likely we were unable to connect the + # transport and there is likely a stale entry in + # the registry actor's table, thus we need to + # instruct it to clear that stale entry and then + # more silently (pretend there was no reason but + # to) indicate that the target actor can't be + # contacted at that addr. + except OSError: + # NOTE: ensure we delete the stale entry from the + # registar actor. + uid: tuple[str, str] = await reg_portal.run_from_ns( + 'self', + 'delete_sockaddr', + sockaddr=addr.unwrap(), + ) + log.error( + f'Deleted stale registry entry !\n' + f'addr: {addr!r}\n' + f'uid: {uid!r}\n' + ) @acm @@ -272,7 +293,7 @@ async def find_actor( if not any(portals): if raise_on_none: raise RuntimeError( - f'No actor "{name}" found registered @ {registry_addrs}' + f'No actor {name!r} found registered @ {registry_addrs!r}' ) yield None return @@ -288,6 +309,7 @@ async def find_actor( yield portals + @acm async def wait_for_actor( name: str,