Add `tractor.wait_for_actor()` helper
Allows for waiting on another actor (by name) to register with the arbiter. This makes synchronized actor spawning and consecutive task coordination easier to accomplish from within sub-actors. Resolves #31wait_for_actor
parent
8e027ff571
commit
3f0c644768
|
@ -9,7 +9,7 @@ import trio
|
||||||
from .log import get_console_log, get_logger, get_loglevel
|
from .log import get_console_log, get_logger, get_loglevel
|
||||||
from ._ipc import _connect_chan, Channel
|
from ._ipc import _connect_chan, Channel
|
||||||
from ._actor import (
|
from ._actor import (
|
||||||
Actor, _start_actor, Arbiter, get_arbiter, find_actor
|
Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor
|
||||||
)
|
)
|
||||||
from ._trionics import open_nursery
|
from ._trionics import open_nursery
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
|
@ -17,8 +17,13 @@ from ._portal import RemoteActorError
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'current_actor', 'find_actor', 'get_arbiter', 'open_nursery',
|
'current_actor',
|
||||||
'RemoteActorError', 'Channel',
|
'find_actor',
|
||||||
|
'get_arbiter',
|
||||||
|
'wait_for_actor',
|
||||||
|
'open_nursery',
|
||||||
|
'RemoteActorError',
|
||||||
|
'Channel',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,13 @@ from async_generator import asynccontextmanager, aclosing
|
||||||
|
|
||||||
from ._ipc import Channel, _connect_chan
|
from ._ipc import Channel, _connect_chan
|
||||||
from .log import get_console_log, get_logger
|
from .log import get_console_log, get_logger
|
||||||
from ._portal import (Portal, open_portal, _do_handshake, LocalPortal,
|
from ._portal import (
|
||||||
maybe_open_nursery)
|
Portal,
|
||||||
|
open_portal,
|
||||||
|
_do_handshake,
|
||||||
|
LocalPortal,
|
||||||
|
maybe_open_nursery
|
||||||
|
)
|
||||||
from . import _state
|
from . import _state
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
|
|
||||||
|
@ -293,12 +298,12 @@ class Actor:
|
||||||
cid = msg.get('cid')
|
cid = msg.get('cid')
|
||||||
if cid:
|
if cid:
|
||||||
if cid == 'internal': # internal actor error
|
if cid == 'internal': # internal actor error
|
||||||
# import pdb; pdb.set_trace()
|
|
||||||
raise InternalActorError(
|
raise InternalActorError(
|
||||||
f"{chan.uid}\n" + msg['error'])
|
f"{chan.uid}\n" + msg['error'])
|
||||||
|
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
await self._push_result(chan.uid, cid, msg)
|
await self._push_result(chan.uid, cid, msg)
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
continue
|
continue
|
||||||
|
@ -591,16 +596,44 @@ class Arbiter(Actor):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self._registry = defaultdict(list)
|
self._registry = defaultdict(list)
|
||||||
|
self._waiters = {}
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
def find_actor(self, name):
|
def find_actor(self, name):
|
||||||
for uid, actor in self._registry.items():
|
for uid, sockaddr in self._registry.items():
|
||||||
if name in uid:
|
if name in uid:
|
||||||
print('found it!')
|
return sockaddr
|
||||||
return actor
|
|
||||||
|
async def wait_for_actor(self, name):
|
||||||
|
"""Wait for a particular actor to register.
|
||||||
|
|
||||||
|
This is a blocking call if no actor by the provided name is currently
|
||||||
|
registered.
|
||||||
|
"""
|
||||||
|
sockaddrs = []
|
||||||
|
|
||||||
|
for (aname, _), sockaddr in self._registry.items():
|
||||||
|
if name == aname:
|
||||||
|
sockaddrs.append(sockaddr)
|
||||||
|
|
||||||
|
if not sockaddrs:
|
||||||
|
waiter = trio.Event()
|
||||||
|
self._waiters.setdefault(name, []).append(waiter)
|
||||||
|
await waiter.wait()
|
||||||
|
for uid in self._waiters[name]:
|
||||||
|
sockaddrs.append(self._registry[uid])
|
||||||
|
|
||||||
|
return sockaddrs
|
||||||
|
|
||||||
def register_actor(self, uid, sockaddr):
|
def register_actor(self, uid, sockaddr):
|
||||||
self._registry[uid].append(sockaddr)
|
name, uuid = uid
|
||||||
|
self._registry[uid] = sockaddr
|
||||||
|
|
||||||
|
# pop and signal all waiter events
|
||||||
|
events = self._waiters.pop(name, ())
|
||||||
|
self._waiters.setdefault(name, []).append(uid)
|
||||||
|
for event in events:
|
||||||
|
event.set()
|
||||||
|
|
||||||
def unregister_actor(self, uid):
|
def unregister_actor(self, uid):
|
||||||
self._registry.pop(uid, None)
|
self._registry.pop(uid, None)
|
||||||
|
@ -633,7 +666,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
|
||||||
result = await main()
|
result = await main()
|
||||||
# XXX: If spawned with a dedicated "main function",
|
# XXX: If spawned with a dedicated "main function",
|
||||||
# the actor is cancelled when this context is complete
|
# the actor is cancelled when this context is complete
|
||||||
# given that there are no more active peer channels connected to it.
|
# given that there are no more active peer channels connected
|
||||||
actor.cancel_server()
|
actor.cancel_server()
|
||||||
|
|
||||||
# block on actor to complete
|
# block on actor to complete
|
||||||
|
@ -675,17 +708,31 @@ async def find_actor(
|
||||||
known to the arbiter.
|
known to the arbiter.
|
||||||
"""
|
"""
|
||||||
actor = current_actor()
|
actor = current_actor()
|
||||||
if not actor:
|
|
||||||
raise RuntimeError("No actor instance has been defined yet?")
|
|
||||||
|
|
||||||
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
|
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
|
||||||
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
|
sockaddr = await arb_portal.run('self', 'find_actor', name=name)
|
||||||
# TODO: return portals to all available actors - for now just
|
# TODO: return portals to all available actors - for now just
|
||||||
# the last one that registered
|
# the last one that registered
|
||||||
if sockaddrs:
|
if sockaddr:
|
||||||
sockaddr = sockaddrs[-1]
|
|
||||||
async with _connect_chan(*sockaddr) as chan:
|
async with _connect_chan(*sockaddr) as chan:
|
||||||
async with open_portal(chan) as portal:
|
async with open_portal(chan) as portal:
|
||||||
yield portal
|
yield portal
|
||||||
else:
|
else:
|
||||||
yield None
|
yield None
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def wait_for_actor(
|
||||||
|
name,
|
||||||
|
arbiter_sockaddr=None,
|
||||||
|
):
|
||||||
|
"""Wait on an actor to register with the arbiter.
|
||||||
|
|
||||||
|
A portal to the first actor which registered is be returned.
|
||||||
|
"""
|
||||||
|
actor = current_actor()
|
||||||
|
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
|
||||||
|
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
|
||||||
|
sockaddr = sockaddrs[-1]
|
||||||
|
async with _connect_chan(*sockaddr) as chan:
|
||||||
|
async with open_portal(chan) as portal:
|
||||||
|
yield portal
|
||||||
|
|
|
@ -7,4 +7,6 @@ _current_actor = None
|
||||||
def current_actor() -> 'Actor':
|
def current_actor() -> 'Actor':
|
||||||
"""Get the process-local actor instance.
|
"""Get the process-local actor instance.
|
||||||
"""
|
"""
|
||||||
|
if not _current_actor:
|
||||||
|
raise RuntimeError("No actor instance has been defined yet?")
|
||||||
return _current_actor
|
return _current_actor
|
||||||
|
|
Loading…
Reference in New Issue