From 3f0c644768d9cf76d7f115c332e93c39d7a39f30 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 12 Aug 2018 23:59:19 -0400 Subject: [PATCH 01/10] Add `tractor.wait_for_actor()` helper Allows for waiting on another actor (by name) to register with the arbiter. This makes synchronized actor spawning and consecutive task coordination easier to accomplish from within sub-actors. Resolves #31 --- tractor/__init__.py | 11 +++++-- tractor/_actor.py | 75 ++++++++++++++++++++++++++++++++++++--------- tractor/_state.py | 2 ++ 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 515f8dc..f5146ad 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -9,7 +9,7 @@ import trio from .log import get_console_log, get_logger, get_loglevel from ._ipc import _connect_chan, Channel from ._actor import ( - Actor, _start_actor, Arbiter, get_arbiter, find_actor + Actor, _start_actor, Arbiter, get_arbiter, find_actor, wait_for_actor ) from ._trionics import open_nursery from ._state import current_actor @@ -17,8 +17,13 @@ from ._portal import RemoteActorError __all__ = [ - 'current_actor', 'find_actor', 'get_arbiter', 'open_nursery', - 'RemoteActorError', 'Channel', + 'current_actor', + 'find_actor', + 'get_arbiter', + 'wait_for_actor', + 'open_nursery', + 'RemoteActorError', + 'Channel', ] diff --git a/tractor/_actor.py b/tractor/_actor.py index 9dfe73d..679a5a3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -14,8 +14,13 @@ from async_generator import asynccontextmanager, aclosing from ._ipc import Channel, _connect_chan from .log import get_console_log, get_logger -from ._portal import (Portal, open_portal, _do_handshake, LocalPortal, - maybe_open_nursery) +from ._portal import ( + Portal, + open_portal, + _do_handshake, + LocalPortal, + maybe_open_nursery +) from . import _state from ._state import current_actor @@ -293,12 +298,12 @@ class Actor: cid = msg.get('cid') if cid: if cid == 'internal': # internal actor error - # import pdb; pdb.set_trace() raise InternalActorError( f"{chan.uid}\n" + msg['error']) # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -591,16 +596,44 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): self._registry = defaultdict(list) + self._waiters = {} super().__init__(*args, **kwargs) def find_actor(self, name): - for uid, actor in self._registry.items(): + for uid, sockaddr in self._registry.items(): if name in uid: - print('found it!') - return actor + return sockaddr + + async def wait_for_actor(self, name): + """Wait for a particular actor to register. + + This is a blocking call if no actor by the provided name is currently + registered. + """ + sockaddrs = [] + + for (aname, _), sockaddr in self._registry.items(): + if name == aname: + sockaddrs.append(sockaddr) + + if not sockaddrs: + waiter = trio.Event() + self._waiters.setdefault(name, []).append(waiter) + await waiter.wait() + for uid in self._waiters[name]: + sockaddrs.append(self._registry[uid]) + + return sockaddrs def register_actor(self, uid, sockaddr): - self._registry[uid].append(sockaddr) + name, uuid = uid + self._registry[uid] = sockaddr + + # pop and signal all waiter events + events = self._waiters.pop(name, ()) + self._waiters.setdefault(name, []).append(uid) + for event in events: + event.set() def unregister_actor(self, uid): self._registry.pop(uid, None) @@ -633,7 +666,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): result = await main() # XXX: If spawned with a dedicated "main function", # the actor is cancelled when this context is complete - # given that there are no more active peer channels connected to it. + # given that there are no more active peer channels connected actor.cancel_server() # block on actor to complete @@ -675,17 +708,31 @@ async def find_actor( known to the arbiter. """ actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'find_actor', name=name) + sockaddr = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered - if sockaddrs: - sockaddr = sockaddrs[-1] + if sockaddr: async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: yield None + + +@asynccontextmanager +async def wait_for_actor( + name, + arbiter_sockaddr=None, +): + """Wait on an actor to register with the arbiter. + + A portal to the first actor which registered is be returned. + """ + actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal diff --git a/tractor/_state.py b/tractor/_state.py index a31b1cc..767bb27 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -7,4 +7,6 @@ _current_actor = None def current_actor() -> 'Actor': """Get the process-local actor instance. """ + if not _current_actor: + raise RuntimeError("No actor instance has been defined yet?") return _current_actor From ea60a3dff979bacd1c49e2cfe2fa30c07bb3a54a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 13 Aug 2018 00:06:22 -0400 Subject: [PATCH 02/10] Test the `wait_for_actor()` api --- tests/test_discovery.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 3e92df4..238633f 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,7 @@ """ Actor "discovery" testing """ +import pytest import tractor import trio @@ -49,8 +50,15 @@ async def say_hello(other_actor): return await portal.run(__name__, 'hi') +async def say_hello_use_wait(other_actor): + async with tractor.wait_for_actor(other_actor) as portal: + result = await portal.run(__name__, 'hi') + return result + + @tractor_test -async def test_trynamic_trio(): +@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) +async def test_trynamic_trio(func): """Main tractor entry point, the "master" process (for now acts as the "director"). """ @@ -59,15 +67,14 @@ async def test_trynamic_trio(): donny = await n.run_in_actor( 'donny', - say_hello, + func, other_actor='gretchen', ) gretchen = await n.run_in_actor( 'gretchen', - say_hello, + func, other_actor='donny', ) print(await gretchen.result()) print(await donny.result()) - await donny.cancel_actor() print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") From b1f17dea1fa6f5521dfe60513aa3a3fcbf8f929d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 13 Aug 2018 11:37:09 -0400 Subject: [PATCH 03/10] Update readme and upgrade all packages on travis --- .travis.yml | 2 +- README.rst | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 74f2939..ece0ba8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ python: install: - cd $TRAVIS_BUILD_DIR - - pip install . -r requirements-test.txt + - pip install -U . -r requirements-test.txt script: - pytest tests/ --no-print-logs diff --git a/README.rst b/README.rst index 6eead52..d15b20b 100644 --- a/README.rst +++ b/README.rst @@ -94,8 +94,7 @@ the hip new film we're shooting: async def say_hello(other_actor): - await trio.sleep(0.4) # wait for other actor to spawn - async with tractor.find_actor(other_actor) as portal: + async with tractor.wait_for_actor(other_actor) as portal: return await portal.run(_this_module, 'hi') @@ -118,7 +117,6 @@ the hip new film we're shooting: ) print(await gretchen.result()) print(await donny.result()) - await donny.cancel_actor() print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...") From 09e3a9406009d2ecbcff79331a9644b1154f3643 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 14 Aug 2018 22:51:37 -0400 Subject: [PATCH 04/10] Cancel result waiter once proc terminates --- tractor/_portal.py | 4 ++- tractor/_trionics.py | 58 ++++++++++++++++++++++++++------------------ 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index d9ee957..04fa0a0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -148,7 +148,9 @@ class Portal: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: - raise RuntimeError("This portal is not expecting a final result?") + raise RuntimeError( + f"Portal for {self.channel.uid} is not expecting a final" + "result?") elif self._result is None: self._result = await self._return_from_resptype( *self._expect_result diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 569f731..2cca0cb 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -126,18 +126,18 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, ) + self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, fn.__name__, **kwargs ) - self._cancel_after_result_on_exit.add(portal) return portal async def wait(self): """Wait for all subactors to complete. """ - async def wait_for_proc(proc, actor, portal): + async def wait_for_proc(proc, actor, portal, cancel_scope): # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) @@ -145,33 +145,45 @@ class ActorNursery: proc.join() log.debug(f"Joined {proc}") self._children.pop(actor.uid) - - async def wait_for_result(portal, actor): - # cancel the actor gracefully - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - - log.debug(f"Waiting on final result from {subactor.uid}") - res = await portal.result() - # if it's an async-gen then we should alert the user - # that we're cancelling it - if inspect.isasyncgen(res): + # proc terminated, cancel result waiter + if cancel_scope: log.warn( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") + f"Cancelling existing result waiter task for {actor.uid}") + cancel_scope.cancel() + + async def wait_for_result( + portal, actor, + task_status=trio.TASK_STATUS_IGNORED, + ): + # cancel the actor gracefully + with trio.open_cancel_scope() as cs: + task_status.started(cs) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + log.debug(f"Waiting on final result from {subactor.uid}") + res = await portal.result() + # if it's an async-gen then we should alert the user + # that we're cancelling it + if inspect.isasyncgen(res): + log.warn( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") + + if cs.cancelled_caught: + log.warn("Result waiter was cancelled") # unblocks when all waiter tasks have completed children = self._children.copy() async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): - nursery.start_soon(wait_for_proc, proc, subactor, portal) - if proc.is_alive() and ( - portal in self._cancel_after_result_on_exit - ): - nursery.start_soon(wait_for_result, portal, subactor) + cs = None + if portal in self._cancel_after_result_on_exit: + cs = await nursery.start(wait_for_result, portal, subactor) + nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) async def cancel(self, hard_kill=False): """Cancel this nursery by instructing each subactor to cancel From 73e8aac36c85bd9e210e267628146a6f1a0b4dbd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Aug 2018 01:09:29 -0400 Subject: [PATCH 05/10] Always allow and enable rpc prior to task start --- tractor/_actor.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 679a5a3..54d864a 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -139,7 +139,6 @@ class Actor: rpc_module_paths: [str] = [], statespace: dict = {}, uid: str = None, - allow_rpc: bool = True, loglevel: str = None, arbiter_addr: (str, int) = None, ): @@ -150,7 +149,6 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.statespace = statespace - self._allow_rpc = allow_rpc self.loglevel = loglevel self._arb_addr = arbiter_addr @@ -190,6 +188,13 @@ class Actor: for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) + # XXX: triggers an internal error which causes a hanging + # problem on teardown (root nursery tears down thus killing all + # channels before sending cancels to subactors during actor + # nursery teardown - has to do with await main() in MainProcess) + # if self.name == 'gretchen': + # self._mods.pop('test_discovery') + async def _stream_handler( self, stream: trio.SocketStream, @@ -352,6 +357,7 @@ class Actor: except InternalActorError: # ship internal errors upwards + log.exception("Received internal error:") if self._parent_chan: await self._parent_chan.send( {'error': traceback.format_exc(), 'cid': 'internal'}) @@ -404,6 +410,9 @@ class Actor: async with maybe_open_nursery(nursery) as nursery: self._root_nursery = nursery + # load allowed RPC module + self.load_namespaces() + # Startup up channel server host, port = accept_addr await nursery.start(partial( @@ -434,6 +443,10 @@ class Actor: await self.cancel() self._parent_chan = None + # handle new connection back to parent + nursery.start_soon( + self._process_messages, self._parent_chan) + # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") async with get_arbiter(*arbiter_addr) as arb_portal: @@ -443,14 +456,6 @@ class Actor: registered_with_arbiter = True task_status.started() - # handle new connection back to parent optionally - # begin responding to RPC - if self._allow_rpc: - self.load_namespaces() - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) - log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -660,6 +665,7 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, + # nursery=nursery ) ) if main is not None: From d4da80c55854cfa94f5c6e6e0ff1639f43ac3cea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Aug 2018 00:21:00 -0400 Subject: [PATCH 06/10] Store remote errors on each portal --- tractor/_portal.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 04fa0a0..169249a 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -57,8 +57,8 @@ class Portal: # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None + self._exc = None self._expect_result = None - self._errored = False async def aclose(self): log.debug(f"Closing {self}") @@ -139,8 +139,9 @@ class Portal: try: return msg['return'] except KeyError: - raise RemoteActorError( + self._exc = RemoteActorError( f"{self.channel.uid}\n" + msg['error']) + raise self._exc else: raise ValueError(f"Unknown msg response type: {first_msg}") From f8111e51cd9c6914725aa91015ceeb98cf796568 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Aug 2018 00:21:49 -0400 Subject: [PATCH 07/10] Maybe wait for actor result(s) after proc join --- tractor/_trionics.py | 58 +++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 2cca0cb..d78cbdd 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -137,30 +137,11 @@ class ActorNursery: async def wait(self): """Wait for all subactors to complete. """ - async def wait_for_proc(proc, actor, portal, cancel_scope): - # TODO: timeout block here? - if proc.is_alive(): - await trio.hazmat.wait_readable(proc.sentinel) - # please god don't hang - proc.join() - log.debug(f"Joined {proc}") - self._children.pop(actor.uid) - # proc terminated, cancel result waiter - if cancel_scope: - log.warn( - f"Cancelling existing result waiter task for {actor.uid}") - cancel_scope.cancel() - - async def wait_for_result( - portal, actor, - task_status=trio.TASK_STATUS_IGNORED, - ): - # cancel the actor gracefully - with trio.open_cancel_scope() as cs: - task_status.started(cs) - log.info(f"Cancelling {portal.channel.uid} gracefully") - await portal.cancel_actor() - + async def maybe_consume_result(portal, actor): + if ( + portal in self._cancel_after_result_on_exit and + (portal._result is None and portal._exc is None) + ): log.debug(f"Waiting on final result from {subactor.uid}") res = await portal.result() # if it's an async-gen then we should alert the user @@ -173,6 +154,33 @@ class ActorNursery: async for item in agen: log.debug(f"Consuming item {item}") + async def wait_for_proc(proc, actor, portal, cancel_scope): + # TODO: timeout block here? + if proc.is_alive(): + await trio.hazmat.wait_readable(proc.sentinel) + # please god don't hang + proc.join() + log.debug(f"Joined {proc}") + await maybe_consume_result(portal, actor) + + self._children.pop(actor.uid) + # proc terminated, cancel result waiter + if cancel_scope: + log.warn( + f"Cancelling existing result waiter task for {actor.uid}") + cancel_scope.cancel() + + async def wait_for_actor( + portal, actor, + task_status=trio.TASK_STATUS_IGNORED, + ): + # cancel the actor gracefully + with trio.open_cancel_scope() as cs: + task_status.started(cs) + await maybe_consume_result(portal, actor) + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + if cs.cancelled_caught: log.warn("Result waiter was cancelled") @@ -182,7 +190,7 @@ class ActorNursery: for subactor, proc, portal in children.values(): cs = None if portal in self._cancel_after_result_on_exit: - cs = await nursery.start(wait_for_result, portal, subactor) + cs = await nursery.start(wait_for_actor, portal, subactor) nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) async def cancel(self, hard_kill=False): From 901f99bbec50adbf21ce736693a2f45a31ab5428 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Aug 2018 00:22:16 -0400 Subject: [PATCH 08/10] Throw internal errors into the main coroutine If an internal error is bubbled up from some sub-actor throw that error into the `MainProcess` "main" async function / coro in order to trigger nursery teardowns (i.e. cancellations) that need to be done. I'll likely change this shortly back to where we run a "main task" inside `actor._async_main()`... --- tractor/_actor.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 54d864a..1780824 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -19,7 +19,6 @@ from ._portal import ( open_portal, _do_handshake, LocalPortal, - maybe_open_nursery ) from . import _state from ._state import current_actor @@ -189,9 +188,10 @@ class Actor: self._mods[path] = importlib.import_module(path) # XXX: triggers an internal error which causes a hanging - # problem on teardown (root nursery tears down thus killing all - # channels before sending cancels to subactors during actor - # nursery teardown - has to do with await main() in MainProcess) + # problem (without the recently added .throw()) on teardown + # (root nursery tears down thus killing all channels before + # sending cancels to subactors during actor nursery teardown + # - has to do with await main() in MainProcess) # if self.name == 'gretchen': # self._mods.pop('test_discovery') @@ -355,13 +355,16 @@ class Actor: else: # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") - except InternalActorError: + except InternalActorError as err: # ship internal errors upwards log.exception("Received internal error:") if self._parent_chan: await self._parent_chan.send( {'error': traceback.format_exc(), 'cid': 'internal'}) - raise + raise + else: + assert self._main_coro + self._main_coro.throw(err) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") except Exception: @@ -395,7 +398,7 @@ class Actor: accept_addr, arbiter_addr=None, parent_addr=None, - nursery=None, + _main_coro=None, task_status=trio.TASK_STATUS_IGNORED, ): """Start the channel server, maybe connect back to the parent, and @@ -404,10 +407,14 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ + # if this is the `MainProcess` then we get a ref to the main + # task's coroutine object for tossing errors into + self._main_coro = _main_coro + arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: - async with maybe_open_nursery(nursery) as nursery: + async with trio.open_nursery() as nursery: self._root_nursery = nursery # load allowed RPC module @@ -659,21 +666,26 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): log.info(f"Starting local {actor} @ {host}:{port}") async with trio.open_nursery() as nursery: + + if main is not None: + main_coro = main() + await nursery.start( partial( actor._async_main, accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, - # nursery=nursery + _main_coro=main_coro ) ) if main is not None: - result = await main() - # XXX: If spawned with a dedicated "main function", - # the actor is cancelled when this context is complete - # given that there are no more active peer channels connected - actor.cancel_server() + result = await main_coro + + # XXX: If spawned with a dedicated "main function", + # the actor is cancelled when this context is complete + # given that there are no more active peer channels connected + actor.cancel_server() # block on actor to complete From 3202462cd5396e4767cc27a1ef805885672f86fe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Aug 2018 14:49:17 -0400 Subject: [PATCH 09/10] Attach remote internal errors to channels This ensures that internal errors received from a remote actor are indeed raised even in the `MainProcess` **before** comms tasks are cancelled. Internal error in this case means any error packet received on a channel that doesn't have a `cid` header. RPC errors (which **do** have a `cid` header) are still forwarded to the consuming caller as usual. --- tractor/_actor.py | 44 +++++++++++++++++++++----------------------- tractor/_portal.py | 14 +++++++++++--- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1780824..a5e833f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -187,7 +187,7 @@ class Actor: for path in self.rpc_module_paths: self._mods[path] = importlib.import_module(path) - # XXX: triggers an internal error which causes a hanging + # XXX: triggers an internal error which can cause a hanging # problem (without the recently added .throw()) on teardown # (root nursery tears down thus killing all channels before # sending cancels to subactors during actor nursery teardown @@ -302,19 +302,22 @@ class Actor: log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: - if cid == 'internal': # internal actor error - raise InternalActorError( - f"{chan.uid}\n" + msg['error']) - # deliver response to local caller/waiter await self._push_result(chan.uid, cid, msg) - log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue # process command request - ns, funcname, kwargs, actorid, cid = msg['cmd'] + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # push any non-rpc-response error to all local consumers + # and mark the channel as errored + chan._exc = err = msg['error'] + for cid in self._actors2calls[chan.uid]: + await self._push_result(chan.uid, cid, msg) + raise InternalActorError(f"{chan.uid}\n" + err) log.debug( f"Processing request from {actorid}\n" @@ -355,23 +358,16 @@ class Actor: else: # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") - except InternalActorError as err: - # ship internal errors upwards - log.exception("Received internal error:") - if self._parent_chan: - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) - raise - else: - assert self._main_coro - self._main_coro.throw(err) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") except Exception: - # ship exception (from above code) to peer as an internal error - await chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) - raise + # ship exception (from above code) to parent + log.exception("Actor errored:") + if self._parent_chan: + await self._parent_chan.send({'error': traceback.format_exc()}) + raise + # if this is the `MainProcess` we expect the error broadcasting + # above to trigger an error at consuming portal "checkpoints" finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") @@ -471,7 +467,8 @@ class Actor: if self._parent_chan: try: await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'internal'}) + # {'error': traceback.format_exc(), 'cid': 'internal'}) + {'error': traceback.format_exc()}) except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " @@ -480,7 +477,8 @@ class Actor: if not registered_with_arbiter: log.exception( - f"Failed to register with arbiter @ {arbiter_addr}") + f"Actor errored and failed to register with arbiter " + f"@ {arbiter_addr}") else: raise finally: diff --git a/tractor/_portal.py b/tractor/_portal.py index 169249a..835f918 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -149,9 +149,16 @@ class Portal: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: - raise RuntimeError( - f"Portal for {self.channel.uid} is not expecting a final" - "result?") + # (remote) errors are slapped on the channel + # teardown can reraise them + exc = self.channel._exc + if exc: + raise RemoteActorError(f"{self.channel.uid}\n" + exc) + else: + raise RuntimeError( + f"Portal for {self.channel.uid} is not expecting a final" + "result?") + elif self._result is None: self._result = await self._return_from_resptype( *self._expect_result @@ -184,6 +191,7 @@ class Portal: log.warn(f"May have failed to cancel {self.channel.uid}") return False + class LocalPortal: """A 'portal' to a local ``Actor``. From 8c110c79fbff26af8d70c65ec84e6b531df81a7b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Aug 2018 15:40:59 -0400 Subject: [PATCH 10/10] A teensy more lax on the speed test --- tests/test_tractor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 3683123..61b1ea8 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -387,7 +387,7 @@ def test_a_quadruple_example(arb_addr): """This also serves as a kind of "we'd like to eventually be this fast test". """ - results = tractor.run(cancel_after, 2.1, arbiter_addr=arb_addr) + results = tractor.run(cancel_after, 2.2, arbiter_addr=arb_addr) assert results