Throw internal errors into the main coroutine

If an internal error is bubbled up from some sub-actor throw that error
into the `MainProcess` "main" async function / coro in order to trigger
nursery teardowns (i.e. cancellations) that need to be done.

I'll likely change this shortly back to where we run a "main task"
inside `actor._async_main()`...
wait_for_actor
Tyler Goodlet 2018-08-16 00:22:16 -04:00
parent f8111e51cd
commit 901f99bbec
1 changed files with 26 additions and 14 deletions

View File

@ -19,7 +19,6 @@ from ._portal import (
open_portal, open_portal,
_do_handshake, _do_handshake,
LocalPortal, LocalPortal,
maybe_open_nursery
) )
from . import _state from . import _state
from ._state import current_actor from ._state import current_actor
@ -189,9 +188,10 @@ class Actor:
self._mods[path] = importlib.import_module(path) self._mods[path] = importlib.import_module(path)
# XXX: triggers an internal error which causes a hanging # XXX: triggers an internal error which causes a hanging
# problem on teardown (root nursery tears down thus killing all # problem (without the recently added .throw()) on teardown
# channels before sending cancels to subactors during actor # (root nursery tears down thus killing all channels before
# nursery teardown - has to do with await main() in MainProcess) # sending cancels to subactors during actor nursery teardown
# - has to do with await main() in MainProcess)
# if self.name == 'gretchen': # if self.name == 'gretchen':
# self._mods.pop('test_discovery') # self._mods.pop('test_discovery')
@ -355,13 +355,16 @@ class Actor:
else: # channel disconnect else: # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(f"{chan} from {chan.uid} disconnected")
except InternalActorError: except InternalActorError as err:
# ship internal errors upwards # ship internal errors upwards
log.exception("Received internal error:") log.exception("Received internal error:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'}) {'error': traceback.format_exc(), 'cid': 'internal'})
raise raise
else:
assert self._main_coro
self._main_coro.throw(err)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
except Exception: except Exception:
@ -395,7 +398,7 @@ class Actor:
accept_addr, accept_addr,
arbiter_addr=None, arbiter_addr=None,
parent_addr=None, parent_addr=None,
nursery=None, _main_coro=None,
task_status=trio.TASK_STATUS_IGNORED, task_status=trio.TASK_STATUS_IGNORED,
): ):
"""Start the channel server, maybe connect back to the parent, and """Start the channel server, maybe connect back to the parent, and
@ -404,10 +407,14 @@ class Actor:
A "root-most" (or "top-level") nursery for this actor is opened here A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
# if this is the `MainProcess` then we get a ref to the main
# task's coroutine object for tossing errors into
self._main_coro = _main_coro
arbiter_addr = arbiter_addr or self._arb_addr arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False registered_with_arbiter = False
try: try:
async with maybe_open_nursery(nursery) as nursery: async with trio.open_nursery() as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# load allowed RPC module # load allowed RPC module
@ -659,21 +666,26 @@ async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
log.info(f"Starting local {actor} @ {host}:{port}") log.info(f"Starting local {actor} @ {host}:{port}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if main is not None:
main_coro = main()
await nursery.start( await nursery.start(
partial( partial(
actor._async_main, actor._async_main,
accept_addr=(host, port), accept_addr=(host, port),
parent_addr=None, parent_addr=None,
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
# nursery=nursery _main_coro=main_coro
) )
) )
if main is not None: if main is not None:
result = await main() result = await main_coro
# XXX: If spawned with a dedicated "main function",
# the actor is cancelled when this context is complete # XXX: If spawned with a dedicated "main function",
# given that there are no more active peer channels connected # the actor is cancelled when this context is complete
actor.cancel_server() # given that there are no more active peer channels connected
actor.cancel_server()
# block on actor to complete # block on actor to complete