diff --git a/tractor/_actor.py b/tractor/_actor.py index 3e244fa..5d2a35d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -7,7 +7,6 @@ from itertools import chain import importlib import importlib.util import inspect -import bdb import uuid import typing from typing import Dict, List, Tuple, Any, Optional @@ -27,6 +26,7 @@ from ._exceptions import ( unpack_error, ModuleNotExposed ) +from ._debug import _maybe_enter_pm from ._discovery import get_arbiter from ._portal import Portal from . import _state @@ -127,13 +127,8 @@ async def _invoke( await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: # NOTE: don't enter debug mode recursively after quitting pdb - if _state.debug_mode() and not isinstance(err, bdb.BdbQuit): - # Allow for pdb control in parent - from ._debug import post_mortem - log.exception("Actor crashed, entering debug mode:") - await post_mortem() - else: - log.exception("Actor crashed:") + log.exception("Actor crashed:") + await _maybe_enter_pm(err) # always ship errors back to caller err_msg = pack_error(err) @@ -201,6 +196,7 @@ class Actor: """ self.name = name self.uid = (name, uid or str(uuid.uuid4())) + self._is_cancelled: bool = False # retreive and store parent `__main__` data which # will be passed to children @@ -251,7 +247,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # noqa async def wait_for_peer( self, uid: Tuple[str, str] @@ -564,7 +560,7 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") - async def _chan_to_parent( + async def _from_parent( self, parent_addr: Optional[Tuple[str, int]], ) -> Tuple[Channel, Optional[Tuple[str, int]]]: @@ -593,6 +589,10 @@ class Actor: parent_data.pop('bind_host'), parent_data.pop('bind_port'), ) + rvs = parent_data.pop('_runtime_vars') + rvs['_is_root'] = False + _state._runtime_vars.update(rvs) + for attr, value in parent_data.items(): setattr(self, attr, value) @@ -630,13 +630,13 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: - self._parent_chan, accept_addr_from_rent = await self._chan_to_parent( + self._parent_chan, accept_addr_rent = await self._from_parent( parent_addr) # either it's passed in because we're not a child # or because we're running in mp mode - if accept_addr_from_rent is not None: - accept_addr = accept_addr_from_rent + if accept_addr_rent is not None: + accept_addr = accept_addr_rent # load exposed/allowed RPC modules # XXX: do this **after** establishing a channel to the parent @@ -711,7 +711,7 @@ class Actor: # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except (trio.MultiError, Exception) as err: + except Exception as err: if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -822,6 +822,8 @@ class Actor: spawning new rpc tasks - return control the parent channel message loop """ + self._is_cancelled = True + # cancel all ongoing rpc tasks with trio.CancelScope(shield=True): # kill all ongoing tasks @@ -1039,7 +1041,12 @@ async def _start_actor( parent_addr=None ) ) - result = await main() + try: + result = await main() + except (Exception, trio.MultiError) as err: + log.exception("Actor crashed:") + await _maybe_enter_pm(err) + raise # XXX: the actor is cancelled when this context is complete # given that there are no more active peer channels connected