From 80897a8f2baa075449c1aa0ae81403a5c1bb256f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 16:21:52 -0500 Subject: [PATCH] Add `tractor.query_actor()` an addr looker-upper Sometimes it's handy to just have a non-`Portal` yielding way to figure out if a "service" actor is up, so add this discovery helper for that. We'll prolly just leave it undocumented for now until we figure out a longer-term/better discovery system. --- tractor/__init__.py | 22 +++++++++----- tractor/_discovery.py | 69 ++++++++++++++++++++++++++++++++----------- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 6fe1020..85d759c 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -29,7 +29,12 @@ from ._streaming import ( stream, context, ) -from ._discovery import get_arbiter, find_actor, wait_for_actor +from ._discovery import ( + get_arbiter, + find_actor, + wait_for_actor, + query_actor, +) from ._supervise import open_nursery from ._state import current_actor, is_root_process from ._exceptions import ( @@ -46,11 +51,15 @@ from ._portal import Portal __all__ = [ 'Channel', 'Context', - 'ModuleNotExposed', - 'MultiError', - 'RemoteActorError', 'ContextCancelled', + 'ModuleNotExposed', + 'MsgStream', + 'MultiError', + 'Portal', + 'ReceiveMsgStream', + 'RemoteActorError', 'breakpoint', + 'context', 'current_actor', 'find_actor', 'get_arbiter', @@ -59,14 +68,11 @@ __all__ = [ 'open_actor_cluster', 'open_nursery', 'open_root_actor', - 'Portal', 'post_mortem', + 'query_actor', 'run', 'run_daemon', 'stream', - 'context', - 'ReceiveMsgStream', - 'MsgStream', 'to_asyncio', 'wait_for_actor', ] diff --git a/tractor/_discovery.py b/tractor/_discovery.py index b7f6fce..ef049fe 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -20,7 +20,7 @@ Actor discovery API. """ import typing from typing import Tuple, Optional, Union -from async_generator import asynccontextmanager +from contextlib import asynccontextmanager as acm from ._ipc import _connect_chan, Channel from ._portal import ( @@ -31,7 +31,7 @@ from ._portal import ( from ._state import current_actor, _runtime_vars -@asynccontextmanager +@acm async def get_arbiter( host: str, @@ -58,7 +58,7 @@ async def get_arbiter( yield arb_portal -@asynccontextmanager +@acm async def get_root( **kwargs, ) -> typing.AsyncGenerator[Portal, None]: @@ -71,28 +71,56 @@ async def get_root( yield portal -@asynccontextmanager -async def find_actor( +@acm +async def query_actor( name: str, - arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Optional[Portal], None]: - """Ask the arbiter to find actor(s) by name. + arbiter_sockaddr: Tuple[str, int] = None, - Returns a connected portal to the last registered matching actor - known to the arbiter. - """ +) -> tuple[str, int]: + ''' + Simple address lookup for a given actor name. + + Returns the (socket) address or ``None``. + + ''' actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + async with get_arbiter( + *arbiter_sockaddr or actor._arb_addr + ) as arb_portal: - sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) + sockaddr = await arb_portal.run_from_ns( + 'self', + 'find_actor', + name=name, + ) # TODO: return portals to all available actors - for now just # the last one that registered if name == 'arbiter' and actor.is_arbiter: raise RuntimeError("The current actor is the arbiter") - elif sockaddr: + yield sockaddr if sockaddr else None + +@acm +async def find_actor( + name: str, + arbiter_sockaddr: Tuple[str, int] = None + +) -> typing.AsyncGenerator[Optional[Portal], None]: + ''' + Ask the arbiter to find actor(s) by name. + + Returns a connected portal to the last registered matching actor + known to the arbiter. + + ''' + async with query_actor( + name=name, + arbiter_sockaddr=arbiter_sockaddr, + ) as sockaddr: + + if sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal @@ -100,7 +128,7 @@ async def find_actor( yield None -@asynccontextmanager +@acm async def wait_for_actor( name: str, arbiter_sockaddr: Tuple[str, int] = None @@ -111,9 +139,14 @@ async def wait_for_actor( """ actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - - sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) + async with get_arbiter( + *arbiter_sockaddr or actor._arb_addr, + ) as arb_portal: + sockaddrs = await arb_portal.run_from_ns( + 'self', + 'wait_for_actor', + name=name, + ) sockaddr = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: