diff --git a/tractor/_discovery.py b/tractor/_discovery.py index a332ab73..401f6fa7 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -60,7 +60,7 @@ log = get_logger(__name__) async def get_registry( addr: UnwrappedAddress|None = None, ) -> AsyncGenerator[ - Portal | LocalPortal | None, + Portal|LocalPortal|None, None, ]: ''' @@ -150,7 +150,7 @@ async def query_actor( regaddr: UnwrappedAddress|None = None, ) -> AsyncGenerator[ - UnwrappedAddress|None, + tuple[UnwrappedAddress|None, Portal|None], None, ]: ''' @@ -164,7 +164,8 @@ async def query_actor( actor: Actor = current_actor() if ( name == 'registrar' - and actor.is_registrar + and + actor.is_registrar ): raise RuntimeError( 'The current actor IS the registry!?' @@ -172,7 +173,7 @@ async def query_actor( maybe_peers: list[Channel]|None = get_peer_by_name(name) if maybe_peers: - yield maybe_peers[0].raddr + yield maybe_peers[0].raddr, None return reg_portal: Portal @@ -185,8 +186,7 @@ async def query_actor( 'find_actor', name=name, ) - yield addr - + yield addr, reg_portal @acm async def maybe_open_portal( @@ -196,15 +196,36 @@ async def maybe_open_portal( async with query_actor( name=name, regaddr=addr, - ) as addr: - pass + ) as (addr, reg_portal): + if not addr: + yield None + return - if addr: - async with _connect_chan(addr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None + try: + async with _connect_chan(addr) as chan: + async with open_portal(chan) as portal: + yield portal + + # most likely we were unable to connect the + # transport and there is likely a stale entry in + # the registry actor's table, thus we need to + # instruct it to clear that stale entry and then + # more silently (pretend there was no reason but + # to) indicate that the target actor can't be + # contacted at that addr. + except OSError: + # NOTE: ensure we delete the stale entry from the + # registar actor. + uid: tuple[str, str] = await reg_portal.run_from_ns( + 'self', + 'delete_sockaddr', + sockaddr=addr.unwrap(), + ) + log.error( + f'Deleted stale registry entry !\n' + f'addr: {addr!r}\n' + f'uid: {uid!r}\n' + ) @acm @@ -272,7 +293,7 @@ async def find_actor( if not any(portals): if raise_on_none: raise RuntimeError( - f'No actor "{name}" found registered @ {registry_addrs}' + f'No actor {name!r} found registered @ {registry_addrs!r}' ) yield None return @@ -288,6 +309,7 @@ async def find_actor( yield portals + @acm async def wait_for_actor( name: str,