From 901f99bbec50adbf21ce736693a2f45a31ab5428 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Aug 2018 00:22:16 -0400 Subject: [PATCH] Throw internal errors into the main coroutine If an internal error is bubbled up from some sub-actor throw that error into the `MainProcess` "main" async function / coro in order to trigger nursery teardowns (i.e. cancellations) that need to be done. I'll likely change this shortly back to where we run a "main task" inside `actor._async_main()`... --- tractor/_actor.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 54d864a..1780824 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -19,7 +19,6 @@ from ._portal import ( open_portal, _do_handshake, LocalPortal, - maybe_open_nursery ) from . import _state from ._state import current_actor @@ -189,9 +188,10 @@ class Actor: self._mods[path] = importlib.import_module(path) # XXX: triggers an internal error which causes a hanging - # problem on teardown (root nursery tears down thus killing all - # channels before sending cancels to subactors during actor - # nursery teardown - has to do with await main() in MainProcess) + # problem (without the recently added .throw()) on teardown + # (root nursery tears down thus killing all channels before + # sending cancels to subactors during actor nursery teardown + # - has to do with await main() in MainProcess) # if self.name == 'gretchen': # self._mods.pop('test_discovery') @@ -355,13 +355,16 @@ class Actor: else: # channel disconnect log.debug(f"{chan} from {chan.uid} disconnected") - except InternalActorError: + except InternalActorError as err: # ship internal errors upwards log.exception("Received internal error:") if self._parent_chan: await self._parent_chan.send( {'error': traceback.format_exc(), 'cid': 'internal'}) - raise + raise + else: + assert self._main_coro + self._main_coro.throw(err) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") except Exception: @@ -395,7 +398,7 @@ class Actor: accept_addr, arbiter_addr=None, parent_addr=None, - nursery=None, + _main_coro=None, task_status=trio.TASK_STATUS_IGNORED, ): """Start the channel server, maybe connect back to the parent, and @@ -404,10 +407,14 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ + # if this is the `MainProcess` then we get a ref to the main + # task's coroutine object for tossing errors into + self._main_coro = _main_coro + arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: - async with maybe_open_nursery(nursery) as nursery: + async with trio.open_nursery() as nursery: self._root_nursery = nursery # load allowed RPC module @@ -659,21 +666,26 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): log.info(f"Starting local {actor} @ {host}:{port}") async with trio.open_nursery() as nursery: + + if main is not None: + main_coro = main() + await nursery.start( partial( actor._async_main, accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, - # nursery=nursery + _main_coro=main_coro ) ) if main is not None: - result = await main() - # XXX: If spawned with a dedicated "main function", - # the actor is cancelled when this context is complete - # given that there are no more active peer channels connected - actor.cancel_server() + result = await main_coro + + # XXX: If spawned with a dedicated "main function", + # the actor is cancelled when this context is complete + # given that there are no more active peer channels connected + actor.cancel_server() # block on actor to complete