From 3f0c644768d9cf76d7f115c332e93c39d7a39f30 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 12 Aug 2018 23:59:19 -0400 Subject: [PATCH] Add `tractor.wait_for_actor()` helper Allows for waiting on another actor (by name) to register with the arbiter. This makes synchronized actor spawning and consecutive task coordination easier to accomplish from within sub-actors. Resolves #31 --- tractor/__init__.py | 11 +++++-- tractor/_actor.py | 75 ++++++++++++++++++++++++++++++++++++--------- tractor/_state.py | 2 ++ 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 515f8dc..f5146ad 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -9,7 +9,7 @@ import trio from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel from ._actor import ( - Actor, _start_actor, Arbiter, get_arbiter, find_actor + Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) from ._trionics import open_nursery from ._state import current_actor @@ -17,8 +17,13 @@ from ._portal import RemoteActorError __all__ = [ - 'current_actor', 'find_actor', 'get_arbiter', 'open_nursery', - 'RemoteActorError', 'Channel', + 'current_actor', + 'find_actor', + 'get_arbiter', + 'wait_for_actor', + 'open_nursery', + 'RemoteActorError', + 'Channel', ] diff --git a/tractor/_actor.py b/tractor/_actor.py index 9dfe73d..679a5a3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -14,8 +14,13 @@ from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan from .log import get_console_log, get_logger -from ._portal import (Portal, open_portal, _do_handshake, LocalPortal, - maybe_open_nursery) +from ._portal import ( + Portal, + open_portal, + _do_handshake, + LocalPortal, + maybe_open_nursery +) from . import _state from ._state import current_actor @@ -293,12 +298,12 @@ class Actor: cid = msg.get('cid') if cid: if cid == 'internal': # internal actor error - # import pdb; pdb.set_trace() raise InternalActorError( f"{chan.uid}\n" + msg['error']) # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -591,16 +596,44 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): self._registry = defaultdict(list) + self._waiters = {} super().__init__(*args, **kwargs) def find_actor(self, name): - for uid, actor in self._registry.items(): + for uid, sockaddr in self._registry.items(): if name in uid: - print('found it!') - return actor + return sockaddr + + async def wait_for_actor(self, name): + """Wait for a particular actor to register. + + This is a blocking call if no actor by the provided name is currently + registered. + """ + sockaddrs = [] + + for (aname, _), sockaddr in self._registry.items(): + if name == aname: + sockaddrs.append(sockaddr) + + if not sockaddrs: + waiter = trio.Event() + self._waiters.setdefault(name, []).append(waiter) + await waiter.wait() + for uid in self._waiters[name]: + sockaddrs.append(self._registry[uid]) + + return sockaddrs def register_actor(self, uid, sockaddr): - self._registry[uid].append(sockaddr) + name, uuid = uid + self._registry[uid] = sockaddr + + # pop and signal all waiter events + events = self._waiters.pop(name, ()) + self._waiters.setdefault(name, []).append(uid) + for event in events: + event.set() def unregister_actor(self, uid): self._registry.pop(uid, None) @@ -633,7 +666,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): result = await main() # XXX: If spawned with a dedicated "main function", # the actor is cancelled when this context is complete - # given that there are no more active peer channels connected to it. + # given that there are no more active peer channels connected actor.cancel_server() # block on actor to complete @@ -675,17 +708,31 @@ async def find_actor( known to the arbiter. """ actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'find_actor', name=name) + sockaddr = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered - if sockaddrs: - sockaddr = sockaddrs[-1] + if sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: yield None + + +@asynccontextmanager +async def wait_for_actor( + name, + arbiter_sockaddr=None, +): + """Wait on an actor to register with the arbiter. + + A portal to the first actor which registered is be returned. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal diff --git a/tractor/_state.py b/tractor/_state.py index a31b1cc..767bb27 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -7,4 +7,6 @@ _current_actor = None def current_actor() -> 'Actor': """Get the process-local actor instance. """ + if not _current_actor: + raise RuntimeError("No actor instance has been defined yet?") return _current_actor