Add `tractor.query_actor()` an addr looker-upper

Sometimes it's handy to just have a non-`Portal` yielding way
to figure out if a "service" actor is up, so add this discovery
helper for that. We'll prolly just leave it undocumented for
now until we figure out a longer-term/better discovery system.
name_query
Tyler Goodlet 2022-03-07 16:21:52 -05:00
parent 62983684d1
commit 80897a8f2b
2 changed files with 65 additions and 26 deletions

View File

@ -29,7 +29,12 @@ from ._streaming import (
stream, stream,
context, context,
) )
from ._discovery import get_arbiter, find_actor, wait_for_actor from ._discovery import (
get_arbiter,
find_actor,
wait_for_actor,
query_actor,
)
from ._supervise import open_nursery from ._supervise import open_nursery
from ._state import current_actor, is_root_process from ._state import current_actor, is_root_process
from ._exceptions import ( from ._exceptions import (
@ -46,11 +51,15 @@ from ._portal import Portal
__all__ = [ __all__ = [
'Channel', 'Channel',
'Context', 'Context',
'ModuleNotExposed',
'MultiError',
'RemoteActorError',
'ContextCancelled', 'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'MultiError',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',
'breakpoint', 'breakpoint',
'context',
'current_actor', 'current_actor',
'find_actor', 'find_actor',
'get_arbiter', 'get_arbiter',
@ -59,14 +68,11 @@ __all__ = [
'open_actor_cluster', 'open_actor_cluster',
'open_nursery', 'open_nursery',
'open_root_actor', 'open_root_actor',
'Portal',
'post_mortem', 'post_mortem',
'query_actor',
'run', 'run',
'run_daemon', 'run_daemon',
'stream', 'stream',
'context',
'ReceiveMsgStream',
'MsgStream',
'to_asyncio', 'to_asyncio',
'wait_for_actor', 'wait_for_actor',
] ]

View File

@ -20,7 +20,7 @@ Actor discovery API.
""" """
import typing import typing
from typing import Tuple, Optional, Union from typing import Tuple, Optional, Union
from async_generator import asynccontextmanager from contextlib import asynccontextmanager as acm
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
from ._portal import ( from ._portal import (
@ -31,7 +31,7 @@ from ._portal import (
from ._state import current_actor, _runtime_vars from ._state import current_actor, _runtime_vars
@asynccontextmanager @acm
async def get_arbiter( async def get_arbiter(
host: str, host: str,
@ -58,7 +58,7 @@ async def get_arbiter(
yield arb_portal yield arb_portal
@asynccontextmanager @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
) -> typing.AsyncGenerator[Portal, None]: ) -> typing.AsyncGenerator[Portal, None]:
@ -71,28 +71,56 @@ async def get_root(
yield portal yield portal
@asynccontextmanager @acm
async def find_actor( async def query_actor(
name: str, name: str,
arbiter_sockaddr: Tuple[str, int] = None arbiter_sockaddr: Tuple[str, int] = None,
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor ) -> tuple[str, int]:
known to the arbiter. '''
""" Simple address lookup for a given actor name.
Returns the (socket) address or ``None``.
'''
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
elif sockaddr: yield sockaddr if sockaddr else None
@acm
async def find_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
'''
Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
@ -100,7 +128,7 @@ async def find_actor(
yield None yield None
@asynccontextmanager @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
arbiter_sockaddr: Tuple[str, int] = None arbiter_sockaddr: Tuple[str, int] = None
@ -111,9 +139,14 @@ async def wait_for_actor(
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr,
sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) ) as arb_portal:
sockaddrs = await arb_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
sockaddr = sockaddrs[-1] sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan: