forked from goodboy/tractor
1
0
Fork 0

Move discovery functions to their own module

stream_functions
Tyler Goodlet 2019-03-24 11:37:11 -04:00
parent 2aa6ffce60
commit 4ee35038fb
3 changed files with 84 additions and 75 deletions

View File

@ -12,9 +12,8 @@ from trio import MultiError
from . import log from . import log
from ._ipc import _connect_chan, Channel, Context from ._ipc import _connect_chan, Channel, Context
from ._actor import ( from ._discovery import get_arbiter, find_actor, wait_for_actor
Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor from ._actor import Actor, _start_actor, Arbiter
)
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed

View File

@ -8,25 +8,21 @@ import importlib
import inspect import inspect
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional
import trio # type: ignore import trio # type: ignore
from async_generator import asynccontextmanager, aclosing from async_generator import aclosing
from ._ipc import Channel, _connect_chan, Context from ._ipc import Channel, Context
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._exceptions import ( from ._exceptions import (
pack_error, pack_error,
unpack_error, unpack_error,
ModuleNotExposed ModuleNotExposed
) )
from ._portal import ( from ._discovery import get_arbiter
Portal, from ._portal import Portal
open_portal,
LocalPortal,
)
from . import _state from . import _state
from ._state import current_actor
log = get_logger('tractor') log = get_logger('tractor')
@ -869,66 +865,3 @@ async def _start_actor(
log.info("Completed async main") log.info("Completed async main")
return result return result
@asynccontextmanager
async def get_arbiter(
host: str, port: int
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(
name: str, arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
elif sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@asynccontextmanager
async def wait_for_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal

View File

@ -0,0 +1,77 @@
"""
Actor discovery API.
"""
import typing
from typing import Tuple, Optional, Union
from async_generator import asynccontextmanager
from ._ipc import _connect_chan
from ._portal import (
Portal,
open_portal,
LocalPortal,
)
from ._state import current_actor
@asynccontextmanager
async def get_arbiter(
host: str, port: int
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(
name: str, arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
elif sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@asynccontextmanager
async def wait_for_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal