Merge pull request #33 from tgoodlet/wait_for_actor

Add wait_for_actor() helper
attrs_it_up
goodboy 2018-08-17 23:31:57 -04:00 committed by GitHub
commit 1264cae218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 191 additions and 85 deletions

View File

@ -7,7 +7,7 @@ python:
install: install:
- cd $TRAVIS_BUILD_DIR - cd $TRAVIS_BUILD_DIR
- pip install . -r requirements-test.txt - pip install -U . -r requirements-test.txt
script: script:
- pytest tests/ --no-print-logs - pytest tests/ --no-print-logs

View File

@ -94,8 +94,7 @@ the hip new film we're shooting:
async def say_hello(other_actor): async def say_hello(other_actor):
await trio.sleep(0.4) # wait for other actor to spawn async with tractor.wait_for_actor(other_actor) as portal:
async with tractor.find_actor(other_actor) as portal:
return await portal.run(_this_module, 'hi') return await portal.run(_this_module, 'hi')
@ -118,7 +117,6 @@ the hip new film we're shooting:
) )
print(await gretchen.result()) print(await gretchen.result())
print(await donny.result()) print(await donny.result())
await donny.cancel_actor()
print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...") print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...")

View File

@ -1,6 +1,7 @@
""" """
Actor "discovery" testing Actor "discovery" testing
""" """
import pytest
import tractor import tractor
import trio import trio
@ -49,8 +50,15 @@ async def say_hello(other_actor):
return await portal.run(__name__, 'hi') return await portal.run(__name__, 'hi')
async def say_hello_use_wait(other_actor):
async with tractor.wait_for_actor(other_actor) as portal:
result = await portal.run(__name__, 'hi')
return result
@tractor_test @tractor_test
async def test_trynamic_trio(): @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
async def test_trynamic_trio(func):
"""Main tractor entry point, the "master" process (for now """Main tractor entry point, the "master" process (for now
acts as the "director"). acts as the "director").
""" """
@ -59,15 +67,14 @@ async def test_trynamic_trio():
donny = await n.run_in_actor( donny = await n.run_in_actor(
'donny', 'donny',
say_hello, func,
other_actor='gretchen', other_actor='gretchen',
) )
gretchen = await n.run_in_actor( gretchen = await n.run_in_actor(
'gretchen', 'gretchen',
say_hello, func,
other_actor='donny', other_actor='donny',
) )
print(await gretchen.result()) print(await gretchen.result())
print(await donny.result()) print(await donny.result())
await donny.cancel_actor()
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")

View File

@ -387,7 +387,7 @@ def test_a_quadruple_example(arb_addr):
"""This also serves as a kind of "we'd like to eventually be this """This also serves as a kind of "we'd like to eventually be this
fast test". fast test".
""" """
results = tractor.run(cancel_after, 2.1, arbiter_addr=arb_addr) results = tractor.run(cancel_after, 2.2, arbiter_addr=arb_addr)
assert results assert results

View File

@ -9,7 +9,7 @@ import trio
from .log import get_console_log, get_logger, get_loglevel from .log import get_console_log, get_logger, get_loglevel
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
from ._actor import ( from ._actor import (
Actor, _start_actor, Arbiter, get_arbiter, find_actor Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor
) )
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
@ -17,8 +17,13 @@ from ._portal import RemoteActorError
__all__ = [ __all__ = [
'current_actor', 'find_actor', 'get_arbiter', 'open_nursery', 'current_actor',
'RemoteActorError', 'Channel', 'find_actor',
'get_arbiter',
'wait_for_actor',
'open_nursery',
'RemoteActorError',
'Channel',
] ]

View File

@ -14,8 +14,12 @@ from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan from ._ipc import Channel, _connect_chan
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from ._portal import (Portal, open_portal, _do_handshake, LocalPortal, from ._portal import (
maybe_open_nursery) Portal,
open_portal,
_do_handshake,
LocalPortal,
)
from . import _state from . import _state
from ._state import current_actor from ._state import current_actor
@ -134,7 +138,6 @@ class Actor:
rpc_module_paths: [str] = [], rpc_module_paths: [str] = [],
statespace: dict = {}, statespace: dict = {},
uid: str = None, uid: str = None,
allow_rpc: bool = True,
loglevel: str = None, loglevel: str = None,
arbiter_addr: (str, int) = None, arbiter_addr: (str, int) = None,
): ):
@ -145,7 +148,6 @@ class Actor:
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.statespace = statespace self.statespace = statespace
self._allow_rpc = allow_rpc
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = arbiter_addr
@ -185,6 +187,14 @@ class Actor:
for path in self.rpc_module_paths: for path in self.rpc_module_paths:
self._mods[path] = importlib.import_module(path) self._mods[path] = importlib.import_module(path)
# XXX: triggers an internal error which can cause a hanging
# problem (without the recently added .throw()) on teardown
# (root nursery tears down thus killing all channels before
# sending cancels to subactors during actor nursery teardown
# - has to do with await main() in MainProcess)
# if self.name == 'gretchen':
# self._mods.pop('test_discovery')
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
@ -292,11 +302,6 @@ class Actor:
log.debug(f"Received msg {msg} from {chan.uid}") log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
if cid == 'internal': # internal actor error
# import pdb; pdb.set_trace()
raise InternalActorError(
f"{chan.uid}\n" + msg['error'])
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan.uid, cid, msg) await self._push_result(chan.uid, cid, msg)
log.debug( log.debug(
@ -304,7 +309,15 @@ class Actor:
continue continue
# process command request # process command request
ns, funcname, kwargs, actorid, cid = msg['cmd'] try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError:
# push any non-rpc-response error to all local consumers
# and mark the channel as errored
chan._exc = err = msg['error']
for cid in self._actors2calls[chan.uid]:
await self._push_result(chan.uid, cid, msg)
raise InternalActorError(f"{chan.uid}\n" + err)
log.debug( log.debug(
f"Processing request from {actorid}\n" f"Processing request from {actorid}\n"
@ -345,19 +358,16 @@ class Actor:
else: # channel disconnect else: # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(f"{chan} from {chan.uid} disconnected")
except InternalActorError:
# ship internal errors upwards
if self._parent_chan:
await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'})
raise
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
except Exception: except Exception:
# ship exception (from above code) to peer as an internal error # ship exception (from above code) to parent
await chan.send( log.exception("Actor errored:")
{'error': traceback.format_exc(), 'cid': 'internal'}) if self._parent_chan:
raise await self._parent_chan.send({'error': traceback.format_exc()})
raise
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
finally: finally:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}") log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
@ -384,7 +394,7 @@ class Actor:
accept_addr, accept_addr,
arbiter_addr=None, arbiter_addr=None,
parent_addr=None, parent_addr=None,
nursery=None, _main_coro=None,
task_status=trio.TASK_STATUS_IGNORED, task_status=trio.TASK_STATUS_IGNORED,
): ):
"""Start the channel server, maybe connect back to the parent, and """Start the channel server, maybe connect back to the parent, and
@ -393,12 +403,19 @@ class Actor:
A "root-most" (or "top-level") nursery for this actor is opened here A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
# if this is the `MainProcess` then we get a ref to the main
# task's coroutine object for tossing errors into
self._main_coro = _main_coro
arbiter_addr = arbiter_addr or self._arb_addr arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False registered_with_arbiter = False
try: try:
async with maybe_open_nursery(nursery) as nursery: async with trio.open_nursery() as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# load allowed RPC module
self.load_namespaces()
# Startup up channel server # Startup up channel server
host, port = accept_addr host, port = accept_addr
await nursery.start(partial( await nursery.start(partial(
@ -429,6 +446,10 @@ class Actor:
await self.cancel() await self.cancel()
self._parent_chan = None self._parent_chan = None
# handle new connection back to parent
nursery.start_soon(
self._process_messages, self._parent_chan)
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal:
@ -438,14 +459,6 @@ class Actor:
registered_with_arbiter = True registered_with_arbiter = True
task_status.started() task_status.started()
# handle new connection back to parent optionally
# begin responding to RPC
if self._allow_rpc:
self.load_namespaces()
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
log.debug("Waiting on root nursery to complete") log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is # the channel server is killed (i.e. this actor is
@ -454,7 +467,8 @@ class Actor:
if self._parent_chan: if self._parent_chan:
try: try:
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'}) # {'error': traceback.format_exc(), 'cid': 'internal'})
{'error': traceback.format_exc()})
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
@ -463,7 +477,8 @@ class Actor:
if not registered_with_arbiter: if not registered_with_arbiter:
log.exception( log.exception(
f"Failed to register with arbiter @ {arbiter_addr}") f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
else: else:
raise raise
finally: finally:
@ -591,16 +606,44 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._registry = defaultdict(list) self._registry = defaultdict(list)
self._waiters = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def find_actor(self, name): def find_actor(self, name):
for uid, actor in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:
print('found it!') return sockaddr
return actor
async def wait_for_actor(self, name):
"""Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
"""
sockaddrs = []
for (aname, _), sockaddr in self._registry.items():
if name == aname:
sockaddrs.append(sockaddr)
if not sockaddrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
sockaddrs.append(self._registry[uid])
return sockaddrs
def register_actor(self, uid, sockaddr): def register_actor(self, uid, sockaddr):
self._registry[uid].append(sockaddr) name, uuid = uid
self._registry[uid] = sockaddr
# pop and signal all waiter events
events = self._waiters.pop(name, ())
self._waiters.setdefault(name, []).append(uid)
for event in events:
event.set()
def unregister_actor(self, uid): def unregister_actor(self, uid):
self._registry.pop(uid, None) self._registry.pop(uid, None)
@ -621,20 +664,26 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
log.info(f"Starting local {actor} @ {host}:{port}") log.info(f"Starting local {actor} @ {host}:{port}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if main is not None:
main_coro = main()
await nursery.start( await nursery.start(
partial( partial(
actor._async_main, actor._async_main,
accept_addr=(host, port), accept_addr=(host, port),
parent_addr=None, parent_addr=None,
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
_main_coro=main_coro
) )
) )
if main is not None: if main is not None:
result = await main() result = await main_coro
# XXX: If spawned with a dedicated "main function",
# the actor is cancelled when this context is complete # XXX: If spawned with a dedicated "main function",
# given that there are no more active peer channels connected to it. # the actor is cancelled when this context is complete
actor.cancel_server() # given that there are no more active peer channels connected
actor.cancel_server()
# block on actor to complete # block on actor to complete
@ -675,17 +724,31 @@ async def find_actor(
known to the arbiter. known to the arbiter.
""" """
actor = current_actor() actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name) sockaddr = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if sockaddrs: if sockaddr:
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
else: else:
yield None yield None
@asynccontextmanager
async def wait_for_actor(
name,
arbiter_sockaddr=None,
):
"""Wait on an actor to register with the arbiter.
A portal to the first actor which registered is be returned.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal

View File

@ -57,8 +57,8 @@ class Portal:
# it is expected that ``result()`` will be awaited at some point # it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result = None self._result = None
self._exc = None
self._expect_result = None self._expect_result = None
self._errored = False
async def aclose(self): async def aclose(self):
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
@ -139,8 +139,9 @@ class Portal:
try: try:
return msg['return'] return msg['return']
except KeyError: except KeyError:
raise RemoteActorError( self._exc = RemoteActorError(
f"{self.channel.uid}\n" + msg['error']) f"{self.channel.uid}\n" + msg['error'])
raise self._exc
else: else:
raise ValueError(f"Unknown msg response type: {first_msg}") raise ValueError(f"Unknown msg response type: {first_msg}")
@ -148,7 +149,16 @@ class Portal:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._expect_result is None: if self._expect_result is None:
raise RuntimeError("This portal is not expecting a final result?") # (remote) errors are slapped on the channel
# teardown can reraise them
exc = self.channel._exc
if exc:
raise RemoteActorError(f"{self.channel.uid}\n" + exc)
else:
raise RuntimeError(
f"Portal for {self.channel.uid} is not expecting a final"
"result?")
elif self._result is None: elif self._result is None:
self._result = await self._return_from_resptype( self._result = await self._return_from_resptype(
*self._expect_result *self._expect_result
@ -181,6 +191,7 @@ class Portal:
log.warn(f"May have failed to cancel {self.channel.uid}") log.warn(f"May have failed to cancel {self.channel.uid}")
return False return False
class LocalPortal: class LocalPortal:
"""A 'portal' to a local ``Actor``. """A 'portal' to a local ``Actor``.

View File

@ -7,4 +7,6 @@ _current_actor = None
def current_actor() -> 'Actor': def current_actor() -> 'Actor':
"""Get the process-local actor instance. """Get the process-local actor instance.
""" """
if not _current_actor:
raise RuntimeError("No actor instance has been defined yet?")
return _current_actor return _current_actor

View File

@ -126,52 +126,72 @@ class ActorNursery:
bind_addr=bind_addr, bind_addr=bind_addr,
statespace=statespace, statespace=statespace,
) )
self._cancel_after_result_on_exit.add(portal)
await portal._submit_for_result( await portal._submit_for_result(
mod_path, mod_path,
fn.__name__, fn.__name__,
**kwargs **kwargs
) )
self._cancel_after_result_on_exit.add(portal)
return portal return portal
async def wait(self): async def wait(self):
"""Wait for all subactors to complete. """Wait for all subactors to complete.
""" """
async def wait_for_proc(proc, actor, portal): async def maybe_consume_result(portal, actor):
if (
portal in self._cancel_after_result_on_exit and
(portal._result is None and portal._exc is None)
):
log.debug(f"Waiting on final result from {subactor.uid}")
res = await portal.result()
# if it's an async-gen then we should alert the user
# that we're cancelling it
if inspect.isasyncgen(res):
log.warn(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
async def wait_for_proc(proc, actor, portal, cancel_scope):
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel) await trio.hazmat.wait_readable(proc.sentinel)
# please god don't hang # please god don't hang
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
await maybe_consume_result(portal, actor)
self._children.pop(actor.uid) self._children.pop(actor.uid)
# proc terminated, cancel result waiter
async def wait_for_result(portal, actor): if cancel_scope:
# cancel the actor gracefully
log.info(f"Cancelling {portal.channel.uid} gracefully")
await portal.cancel_actor()
log.debug(f"Waiting on final result from {subactor.uid}")
res = await portal.result()
# if it's an async-gen then we should alert the user
# that we're cancelling it
if inspect.isasyncgen(res):
log.warn( log.warn(
f"Blindly consuming asyncgen for {actor.uid}") f"Cancelling existing result waiter task for {actor.uid}")
with trio.fail_after(1): cancel_scope.cancel()
async with aclosing(res) as agen:
async for item in agen: async def wait_for_actor(
log.debug(f"Consuming item {item}") portal, actor,
task_status=trio.TASK_STATUS_IGNORED,
):
# cancel the actor gracefully
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await maybe_consume_result(portal, actor)
log.info(f"Cancelling {portal.channel.uid} gracefully")
await portal.cancel_actor()
if cs.cancelled_caught:
log.warn("Result waiter was cancelled")
# unblocks when all waiter tasks have completed # unblocks when all waiter tasks have completed
children = self._children.copy() children = self._children.copy()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values(): for subactor, proc, portal in children.values():
nursery.start_soon(wait_for_proc, proc, subactor, portal) cs = None
if proc.is_alive() and ( if portal in self._cancel_after_result_on_exit:
portal in self._cancel_after_result_on_exit cs = await nursery.start(wait_for_actor, portal, subactor)
): nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
nursery.start_soon(wait_for_result, portal, subactor)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel """Cancel this nursery by instructing each subactor to cancel