From d83d991f21d33df641d880232ef1625724caa2eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Aug 2023 11:26:36 -0400 Subject: [PATCH] Handle stale registrar entries; detect and delete In cases where an actor's transport server task (by default handling new TCP connections) terminates early but does not de-register from the pertaining registry (aka the registrar) actor's address table, the trying-to-connect client actor will get a connection error on that address. In the case where client handles a (local) `OSError` (meaning the target actor address is likely being contacted over `localhost`) exception, make a further call to the registrar to delete the stale entry and `yield None` gracefully indicating to calling code that no `Portal` can be delivered to the target address. This issue was originally discovered in `piker` where the `emsd` (clearing engine) actor would sometimes crash on rapid client re-connects and then leave a `pikerd` stale entry. With this fix new clients will attempt connect via an endpoint which will re-spawn the `emsd` when a `None` portal is delivered (via `maybe_spawn_em()`). --- tractor/__init__.py | 2 ++ tractor/_discovery.py | 68 +++++++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index c653ec0..eba0b45 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -31,6 +31,7 @@ from ._streaming import ( ) from ._discovery import ( get_arbiter, + get_registrar, find_actor, wait_for_actor, query_actor, @@ -77,6 +78,7 @@ __all__ = [ 'find_actor', 'query_actor', 'get_arbiter', + 'get_registrar', 'is_root_process', 'msg', 'open_actor_cluster', diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 03775ac..d23118d 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -35,7 +35,7 @@ from ._state import current_actor, _runtime_vars @acm -async def get_arbiter( +async def get_registrar( host: str, port: int, @@ -56,11 +56,14 @@ async def get_arbiter( # (likely a re-entrant call from the arbiter actor) yield LocalPortal(actor, Channel((host, port))) else: - async with _connect_chan(host, port) as chan: + async with ( + _connect_chan(host, port) as chan, + open_portal(chan) as arb_portal, + ): + yield arb_portal - async with open_portal(chan) as arb_portal: - yield arb_portal +get_arbiter = get_registrar @acm @@ -101,7 +104,10 @@ async def query_actor( # TODO: return portals to all available actors - for now just # the last one that registered - if name == 'arbiter' and actor.is_arbiter: + if ( + name == 'arbiter' + and actor.is_arbiter + ): raise RuntimeError("The current actor is the arbiter") yield sockaddr if sockaddr else None @@ -112,7 +118,7 @@ async def find_actor( name: str, arbiter_sockaddr: tuple[str, int] | None = None -) -> AsyncGenerator[Optional[Portal], None]: +) -> AsyncGenerator[Portal | None, None]: ''' Ask the arbiter to find actor(s) by name. @@ -120,17 +126,49 @@ async def find_actor( known to the arbiter. ''' - async with query_actor( - name=name, - arbiter_sockaddr=arbiter_sockaddr, - ) as sockaddr: + actor = current_actor() + async with get_arbiter( + *arbiter_sockaddr or actor._arb_addr + ) as arb_portal: + + sockaddr = await arb_portal.run_from_ns( + 'self', + 'find_actor', + name=name, + ) + + # TODO: return portals to all available actors - for now just + # the last one that registered + if ( + name == 'arbiter' + and actor.is_arbiter + ): + raise RuntimeError("The current actor is the arbiter") if sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None + try: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + return + + # most likely we were unable to connect the + # transport and there is likely a stale entry in + # the registry actor's table, thus we need to + # instruct it to clear that stale entry and then + # more silently (pretend there was no reason but + # to) indicate that the target actor can't be + # contacted at that addr. + except OSError: + # NOTE: ensure we delete the stale entry from the + # registar actor. + uid: tuple[str, str] = await arb_portal.run_from_ns( + 'self', + 'delete_sockaddr', + sockaddr=sockaddr, + ) + + yield None @acm