diff --git a/tractor/__init__.py b/tractor/__init__.py index 2f6e7d7..d14717b 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -12,9 +12,8 @@ from trio import MultiError from . import log from ._ipc import _connect_chan, Channel, Context -from ._actor import ( - Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor -) +from ._discovery import get_arbiter, find_actor, wait_for_actor +from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed diff --git a/tractor/_actor.py b/tractor/_actor.py index 4fa3808..496c68a 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -8,25 +8,21 @@ import importlib import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional, Union +from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore -from async_generator import asynccontextmanager, aclosing +from async_generator import aclosing -from ._ipc import Channel, _connect_chan, Context +from ._ipc import Channel, Context from .log import get_console_log, get_logger from ._exceptions import ( pack_error, unpack_error, ModuleNotExposed ) -from ._portal import ( - Portal, - open_portal, - LocalPortal, -) +from ._discovery import get_arbiter +from ._portal import Portal from . import _state -from ._state import current_actor log = get_logger('tractor') @@ -869,66 +865,3 @@ async def _start_actor( log.info("Completed async main") return result - - -@asynccontextmanager -async def get_arbiter( - host: str, port: int -) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: - """Return a portal instance connected to a local or remote - arbiter. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - if actor.is_arbiter: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) - else: - async with _connect_chan(host, port) as chan: - async with open_portal(chan) as arb_portal: - yield arb_portal - - -@asynccontextmanager -async def find_actor( - name: str, arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Optional[Portal], None]: - """Ask the arbiter to find actor(s) by name. - - Returns a connected portal to the last registered matching actor - known to the arbiter. - """ - actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddr = await arb_portal.run('self', 'find_actor', name=name) - # TODO: return portals to all available actors - for now just - # the last one that registered - if name == 'arbiter' and actor.is_arbiter: - raise RuntimeError("The current actor is the arbiter") - elif sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None - - -@asynccontextmanager -async def wait_for_actor( - name: str, - arbiter_sockaddr: Tuple[str, int] = None -) -> typing.AsyncGenerator[Portal, None]: - """Wait on an actor to register with the arbiter. - - A portal to the first registered actor is returned. - """ - actor = current_actor() - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) - sockaddr = sockaddrs[-1] - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal diff --git a/tractor/_discovery.py b/tractor/_discovery.py new file mode 100644 index 0000000..ae2e3be --- /dev/null +++ b/tractor/_discovery.py @@ -0,0 +1,77 @@ +""" +Actor discovery API. +""" +import typing +from typing import Tuple, Optional, Union +from async_generator import asynccontextmanager + +from ._ipc import _connect_chan +from ._portal import ( + Portal, + open_portal, + LocalPortal, +) +from ._state import current_actor + + +@asynccontextmanager +async def get_arbiter( + host: str, port: int +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + """Return a portal instance connected to a local or remote + arbiter. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + if actor.is_arbiter: + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) + yield LocalPortal(actor) + else: + async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal + + +@asynccontextmanager +async def find_actor( + name: str, arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Optional[Portal], None]: + """Ask the arbiter to find actor(s) by name. + + Returns a connected portal to the last registered matching actor + known to the arbiter. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddr = await arb_portal.run('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just + # the last one that registered + if name == 'arbiter' and actor.is_arbiter: + raise RuntimeError("The current actor is the arbiter") + elif sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + +@asynccontextmanager +async def wait_for_actor( + name: str, + arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Portal, None]: + """Wait on an actor to register with the arbiter. + + A portal to the first registered actor is returned. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal