diff --git a/.travis.yml b/.travis.yml index 74f2939..ece0ba8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ python: install: - cd $TRAVIS_BUILD_DIR - - pip install . -r requirements-test.txt + - pip install -U . -r requirements-test.txt script: - pytest tests/ --no-print-logs diff --git a/README.rst b/README.rst index 6eead52..d15b20b 100644 --- a/README.rst +++ b/README.rst @@ -94,8 +94,7 @@ the hip new film we're shooting: async def say_hello(other_actor): - await trio.sleep(0.4) # wait for other actor to spawn - async with tractor.find_actor(other_actor) as portal: + async with tractor.wait_for_actor(other_actor) as portal: return await portal.run(_this_module, 'hi') @@ -118,7 +117,6 @@ the hip new film we're shooting: ) print(await gretchen.result()) print(await donny.result()) - await donny.cancel_actor() print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...") diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 3e92df4..238633f 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,7 @@ """ Actor "discovery" testing """ +import pytest import tractor import trio @@ -49,8 +50,15 @@ async def say_hello(other_actor): return await portal.run(__name__, 'hi') +async def say_hello_use_wait(other_actor): + async with tractor.wait_for_actor(other_actor) as portal: + result = await portal.run(__name__, 'hi') + return result + + @tractor_test -async def test_trynamic_trio(): +@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) +async def test_trynamic_trio(func): """Main tractor entry point, the "master" process (for now acts as the "director"). """ @@ -59,15 +67,14 @@ async def test_trynamic_trio(): donny = await n.run_in_actor( 'donny', - say_hello, + func, other_actor='gretchen', ) gretchen = await n.run_in_actor( 'gretchen', - say_hello, + func, other_actor='donny', ) print(await gretchen.result()) print(await donny.result()) - await donny.cancel_actor() print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 3683123..61b1ea8 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -387,7 +387,7 @@ def test_a_quadruple_example(arb_addr): """This also serves as a kind of "we'd like to eventually be this fast test". """ - results = tractor.run(cancel_after, 2.1, arbiter_addr=arb_addr) + results = tractor.run(cancel_after, 2.2, arbiter_addr=arb_addr) assert results diff --git a/tractor/__init__.py b/tractor/__init__.py index 515f8dc..f5146ad 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -9,7 +9,7 @@ import trio from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel from ._actor import ( - Actor, _start_actor, Arbiter, get_arbiter, find_actor + Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) from ._trionics import open_nursery from ._state import current_actor @@ -17,8 +17,13 @@ from ._portal import RemoteActorError __all__ = [ - 'current_actor', 'find_actor', 'get_arbiter', 'open_nursery', - 'RemoteActorError', 'Channel', + 'current_actor', + 'find_actor', + 'get_arbiter', + 'wait_for_actor', + 'open_nursery', + 'RemoteActorError', + 'Channel', ] diff --git a/tractor/_actor.py b/tractor/_actor.py index 9dfe73d..a5e833f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -14,8 +14,12 @@ from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan from .log import get_console_log, get_logger -from ._portal import (Portal, open_portal, _do_handshake, LocalPortal, - maybe_open_nursery) +from ._portal import ( + Portal, + open_portal, + _do_handshake, + LocalPortal, +) from . import _state from ._state import current_actor @@ -134,7 +138,6 @@ class Actor: rpc_module_paths: [str] = [], statespace: dict = {}, uid: str = None, - allow_rpc: bool = True, loglevel: str = None, arbiter_addr: (str, int) = None, ): @@ -145,7 +148,6 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.statespace = statespace - self._allow_rpc = allow_rpc self.loglevel = loglevel self._arb_addr = arbiter_addr @@ -185,6 +187,14 @@ class Actor: for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) + # XXX: triggers an internal error which can cause a hanging + # problem (without the recently added .throw()) on teardown + # (root nursery tears down thus killing all channels before + # sending cancels to subactors during actor nursery teardown + # - has to do with await main() in MainProcess) + # if self.name == 'gretchen': + # self._mods.pop('test_discovery') + async def _stream_handler( self, stream: trio.SocketStream, @@ -292,11 +302,6 @@ class Actor: log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: - if cid == 'internal': # internal actor error - # import pdb; pdb.set_trace() - raise InternalActorError( - f"{chan.uid}\n" + msg['error']) - # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) log.debug( @@ -304,7 +309,15 @@ class Actor: continue # process command request - ns, funcname, kwargs, actorid, cid = msg['cmd'] + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # push any non-rpc-response error to all local consumers + # and mark the channel as errored + chan._exc = err = msg['error'] + for cid in self._actors2calls[chan.uid]: + await self._push_result(chan.uid, cid, msg) + raise InternalActorError(f"{chan.uid}\n" + err) log.debug( f"Processing request from {actorid}\n" @@ -345,19 +358,16 @@ class Actor: else: # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") - except InternalActorError: - # ship internal errors upwards - if self._parent_chan: - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) - raise except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") except Exception: - # ship exception (from above code) to peer as an internal error - await chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) - raise + # ship exception (from above code) to parent + log.exception("Actor errored:") + if self._parent_chan: + await self._parent_chan.send({'error': traceback.format_exc()}) + raise + # if this is the `MainProcess` we expect the error broadcasting + # above to trigger an error at consuming portal "checkpoints" finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") @@ -384,7 +394,7 @@ class Actor: accept_addr, arbiter_addr=None, parent_addr=None, - nursery=None, + _main_coro=None, task_status=trio.TASK_STATUS_IGNORED, ): """Start the channel server, maybe connect back to the parent, and @@ -393,12 +403,19 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ + # if this is the `MainProcess` then we get a ref to the main + # task's coroutine object for tossing errors into + self._main_coro = _main_coro + arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: - async with maybe_open_nursery(nursery) as nursery: + async with trio.open_nursery() as nursery: self._root_nursery = nursery + # load allowed RPC module + self.load_namespaces() + # Startup up channel server host, port = accept_addr await nursery.start(partial( @@ -429,6 +446,10 @@ class Actor: await self.cancel() self._parent_chan = None + # handle new connection back to parent + nursery.start_soon( + self._process_messages, self._parent_chan) + # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") async with get_arbiter(*arbiter_addr) as arb_portal: @@ -438,14 +459,6 @@ class Actor: registered_with_arbiter = True task_status.started() - # handle new connection back to parent optionally - # begin responding to RPC - if self._allow_rpc: - self.load_namespaces() - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) - log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -454,7 +467,8 @@ class Actor: if self._parent_chan: try: await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) + # {'error': traceback.format_exc(), 'cid': 'internal'}) + {'error': traceback.format_exc()}) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " @@ -463,7 +477,8 @@ class Actor: if not registered_with_arbiter: log.exception( - f"Failed to register with arbiter @ {arbiter_addr}") + f"Actor errored and failed to register with arbiter " + f"@ {arbiter_addr}") else: raise finally: @@ -591,16 +606,44 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): self._registry = defaultdict(list) + self._waiters = {} super().__init__(*args, **kwargs) def find_actor(self, name): - for uid, actor in self._registry.items(): + for uid, sockaddr in self._registry.items(): if name in uid: - print('found it!') - return actor + return sockaddr + + async def wait_for_actor(self, name): + """Wait for a particular actor to register. + + This is a blocking call if no actor by the provided name is currently + registered. + """ + sockaddrs = [] + + for (aname, _), sockaddr in self._registry.items(): + if name == aname: + sockaddrs.append(sockaddr) + + if not sockaddrs: + waiter = trio.Event() + self._waiters.setdefault(name, []).append(waiter) + await waiter.wait() + for uid in self._waiters[name]: + sockaddrs.append(self._registry[uid]) + + return sockaddrs def register_actor(self, uid, sockaddr): - self._registry[uid].append(sockaddr) + name, uuid = uid + self._registry[uid] = sockaddr + + # pop and signal all waiter events + events = self._waiters.pop(name, ()) + self._waiters.setdefault(name, []).append(uid) + for event in events: + event.set() def unregister_actor(self, uid): self._registry.pop(uid, None) @@ -621,20 +664,26 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): log.info(f"Starting local {actor} @ {host}:{port}") async with trio.open_nursery() as nursery: + + if main is not None: + main_coro = main() + await nursery.start( partial( actor._async_main, accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, + _main_coro=main_coro ) ) if main is not None: - result = await main() - # XXX: If spawned with a dedicated "main function", - # the actor is cancelled when this context is complete - # given that there are no more active peer channels connected to it. - actor.cancel_server() + result = await main_coro + + # XXX: If spawned with a dedicated "main function", + # the actor is cancelled when this context is complete + # given that there are no more active peer channels connected + actor.cancel_server() # block on actor to complete @@ -675,17 +724,31 @@ async def find_actor( known to the arbiter. """ actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'find_actor', name=name) + sockaddr = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered - if sockaddrs: - sockaddr = sockaddrs[-1] + if sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: yield None + + +@asynccontextmanager +async def wait_for_actor( + name, + arbiter_sockaddr=None, +): + """Wait on an actor to register with the arbiter. + + A portal to the first actor which registered is be returned. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal diff --git a/tractor/_portal.py b/tractor/_portal.py index d9ee957..835f918 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -57,8 +57,8 @@ class Portal: # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None + self._exc = None self._expect_result = None - self._errored = False async def aclose(self): log.debug(f"Closing {self}") @@ -139,8 +139,9 @@ class Portal: try: return msg['return'] except KeyError: - raise RemoteActorError( + self._exc = RemoteActorError( f"{self.channel.uid}\n" + msg['error']) + raise self._exc else: raise ValueError(f"Unknown msg response type: {first_msg}") @@ -148,7 +149,16 @@ class Portal: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: - raise RuntimeError("This portal is not expecting a final result?") + # (remote) errors are slapped on the channel + # teardown can reraise them + exc = self.channel._exc + if exc: + raise RemoteActorError(f"{self.channel.uid}\n" + exc) + else: + raise RuntimeError( + f"Portal for {self.channel.uid} is not expecting a final" + "result?") + elif self._result is None: self._result = await self._return_from_resptype( *self._expect_result @@ -181,6 +191,7 @@ class Portal: log.warn(f"May have failed to cancel {self.channel.uid}") return False + class LocalPortal: """A 'portal' to a local ``Actor``. diff --git a/tractor/_state.py b/tractor/_state.py index a31b1cc..767bb27 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -7,4 +7,6 @@ _current_actor = None def current_actor() -> 'Actor': """Get the process-local actor instance. """ + if not _current_actor: + raise RuntimeError("No actor instance has been defined yet?") return _current_actor diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 569f731..d78cbdd 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -126,52 +126,72 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, ) + self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, fn.__name__, **kwargs ) - self._cancel_after_result_on_exit.add(portal) return portal async def wait(self): """Wait for all subactors to complete. """ - async def wait_for_proc(proc, actor, portal): + async def maybe_consume_result(portal, actor): + if ( + portal in self._cancel_after_result_on_exit and + (portal._result is None and portal._exc is None) + ): + log.debug(f"Waiting on final result from {subactor.uid}") + res = await portal.result() + # if it's an async-gen then we should alert the user + # that we're cancelling it + if inspect.isasyncgen(res): + log.warn( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") + + async def wait_for_proc(proc, actor, portal, cancel_scope): # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) # please god don't hang proc.join() log.debug(f"Joined {proc}") + await maybe_consume_result(portal, actor) + self._children.pop(actor.uid) - - async def wait_for_result(portal, actor): - # cancel the actor gracefully - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - - log.debug(f"Waiting on final result from {subactor.uid}") - res = await portal.result() - # if it's an async-gen then we should alert the user - # that we're cancelling it - if inspect.isasyncgen(res): + # proc terminated, cancel result waiter + if cancel_scope: log.warn( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") + f"Cancelling existing result waiter task for {actor.uid}") + cancel_scope.cancel() + + async def wait_for_actor( + portal, actor, + task_status=trio.TASK_STATUS_IGNORED, + ): + # cancel the actor gracefully + with trio.open_cancel_scope() as cs: + task_status.started(cs) + await maybe_consume_result(portal, actor) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + if cs.cancelled_caught: + log.warn("Result waiter was cancelled") # unblocks when all waiter tasks have completed children = self._children.copy() async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): - nursery.start_soon(wait_for_proc, proc, subactor, portal) - if proc.is_alive() and ( - portal in self._cancel_after_result_on_exit - ): - nursery.start_soon(wait_for_result, portal, subactor) + cs = None + if portal in self._cancel_after_result_on_exit: + cs = await nursery.start(wait_for_actor, portal, subactor) + nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) async def cancel(self, hard_kill=False): """Cancel this nursery by instructing each subactor to cancel