diff --git a/305.misc.rst b/305.misc.rst new file mode 100644 index 0000000..1d5df5b --- /dev/null +++ b/305.misc.rst @@ -0,0 +1,7 @@ +Add ``tractor.query_actor()`` an addr looker-upper which doesn't deliver +a ``Portal`` instance and instead just a socket address ``tuple``. + +Sometimes it's handy to just have a simple way to figure out if +a "service" actor is up, so add this discovery helper for that. We'll +prolly just leave it undocumented for now until we figure out +a longer-term/better discovery system. diff --git a/tractor/__init__.py b/tractor/__init__.py index 6fe1020..85d759c 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -29,7 +29,12 @@ from ._streaming import ( stream, context, ) -from ._discovery import get_arbiter, find_actor, wait_for_actor +from ._discovery import ( + get_arbiter, + find_actor, + wait_for_actor, + query_actor, +) from ._supervise import open_nursery from ._state import current_actor, is_root_process from ._exceptions import ( @@ -46,11 +51,15 @@ from ._portal import Portal __all__ = [ 'Channel', 'Context', - 'ModuleNotExposed', - 'MultiError', - 'RemoteActorError', 'ContextCancelled', + 'ModuleNotExposed', + 'MsgStream', + 'MultiError', + 'Portal', + 'ReceiveMsgStream', + 'RemoteActorError', 'breakpoint', + 'context', 'current_actor', 'find_actor', 'get_arbiter', @@ -59,14 +68,11 @@ __all__ = [ 'open_actor_cluster', 'open_nursery', 'open_root_actor', - 'Portal', 'post_mortem', + 'query_actor', 'run', 'run_daemon', 'stream', - 'context', - 'ReceiveMsgStream', - 'MsgStream', 'to_asyncio', 'wait_for_actor', ] diff --git a/tractor/_discovery.py b/tractor/_discovery.py index b7f6fce..25951b3 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -18,9 +18,8 @@ Actor discovery API. """ -import typing -from typing import Tuple, Optional, Union -from async_generator import asynccontextmanager +from typing import Tuple, Optional, Union, AsyncGenerator +from contextlib import asynccontextmanager as acm from ._ipc import _connect_chan, Channel from ._portal import ( @@ -31,13 +30,13 @@ from ._portal import ( from ._state import current_actor, _runtime_vars -@asynccontextmanager +@acm async def get_arbiter( host: str, port: int, -) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: +) -> AsyncGenerator[Union[Portal, LocalPortal], None]: '''Return a portal instance connected to a local or remote arbiter. ''' @@ -58,10 +57,10 @@ async def get_arbiter( yield arb_portal -@asynccontextmanager +@acm async def get_root( **kwargs, -) -> typing.AsyncGenerator[Portal, None]: +) -> AsyncGenerator[Portal, None]: host, port = _runtime_vars['_root_mailbox'] assert host is not None @@ -71,28 +70,56 @@ async def get_root( yield portal -@asynccontextmanager -async def find_actor( +@acm +async def query_actor( name: str, - arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Optional[Portal], None]: - """Ask the arbiter to find actor(s) by name. + arbiter_sockaddr: Optional[tuple[str, int]] = None, - Returns a connected portal to the last registered matching actor - known to the arbiter. - """ +) -> AsyncGenerator[tuple[str, int], None]: + ''' + Simple address lookup for a given actor name. + + Returns the (socket) address or ``None``. + + ''' actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + async with get_arbiter( + *arbiter_sockaddr or actor._arb_addr + ) as arb_portal: - sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) + sockaddr = await arb_portal.run_from_ns( + 'self', + 'find_actor', + name=name, + ) # TODO: return portals to all available actors - for now just # the last one that registered if name == 'arbiter' and actor.is_arbiter: raise RuntimeError("The current actor is the arbiter") - elif sockaddr: + yield sockaddr if sockaddr else None + +@acm +async def find_actor( + name: str, + arbiter_sockaddr: Tuple[str, int] = None + +) -> AsyncGenerator[Optional[Portal], None]: + ''' + Ask the arbiter to find actor(s) by name. + + Returns a connected portal to the last registered matching actor + known to the arbiter. + + ''' + async with query_actor( + name=name, + arbiter_sockaddr=arbiter_sockaddr, + ) as sockaddr: + + if sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal @@ -100,20 +127,25 @@ async def find_actor( yield None -@asynccontextmanager +@acm async def wait_for_actor( name: str, arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Portal, None]: +) -> AsyncGenerator[Portal, None]: """Wait on an actor to register with the arbiter. A portal to the first registered actor is returned. """ actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - - sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) + async with get_arbiter( + *arbiter_sockaddr or actor._arb_addr, + ) as arb_portal: + sockaddrs = await arb_portal.run_from_ns( + 'self', + 'wait_for_actor', + name=name, + ) sockaddr = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: