forked from goodboy/tractor
1
0
Fork 0

Port to new debug api, set `_is_root` state flag on startup

debug_tests
Tyler Goodlet 2020-09-12 11:48:20 -04:00
parent 150179bfe4
commit 8b6e9f5530
1 changed files with 22 additions and 15 deletions

View File

@ -7,7 +7,6 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import bdb
import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional
@ -27,6 +26,7 @@ from ._exceptions import (
unpack_error,
ModuleNotExposed
)
from ._debug import _maybe_enter_pm
from ._discovery import get_arbiter
from ._portal import Portal
from . import _state
@ -127,13 +127,8 @@ async def _invoke(
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err:
# NOTE: don't enter debug mode recursively after quitting pdb
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
# Allow for pdb control in parent
from ._debug import post_mortem
log.exception("Actor crashed, entering debug mode:")
await post_mortem()
else:
log.exception("Actor crashed:")
await _maybe_enter_pm(err)
# always ship errors back to caller
err_msg = pack_error(err)
@ -201,6 +196,7 @@ class Actor:
"""
self.name = name
self.uid = (name, uid or str(uuid.uuid4()))
self._is_cancelled: bool = False
# retreive and store parent `__main__` data which
# will be passed to children
@ -251,7 +247,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # noqa
async def wait_for_peer(
self, uid: Tuple[str, str]
@ -564,7 +560,7 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
async def _chan_to_parent(
async def _from_parent(
self,
parent_addr: Optional[Tuple[str, int]],
) -> Tuple[Channel, Optional[Tuple[str, int]]]:
@ -593,6 +589,10 @@ class Actor:
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
rvs = parent_data.pop('_runtime_vars')
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items():
setattr(self, attr, value)
@ -630,13 +630,13 @@ class Actor:
# establish primary connection with immediate parent
self._parent_chan = None
if parent_addr is not None:
self._parent_chan, accept_addr_from_rent = await self._chan_to_parent(
self._parent_chan, accept_addr_rent = await self._from_parent(
parent_addr)
# either it's passed in because we're not a child
# or because we're running in mp mode
if accept_addr_from_rent is not None:
accept_addr = accept_addr_from_rent
if accept_addr_rent is not None:
accept_addr = accept_addr_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
@ -711,7 +711,7 @@ class Actor:
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except (trio.MultiError, Exception) as err:
except Exception as err:
if not registered_with_arbiter:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
@ -822,6 +822,8 @@ class Actor:
spawning new rpc tasks
- return control the parent channel message loop
"""
self._is_cancelled = True
# cancel all ongoing rpc tasks
with trio.CancelScope(shield=True):
# kill all ongoing tasks
@ -1039,7 +1041,12 @@ async def _start_actor(
parent_addr=None
)
)
try:
result = await main()
except (Exception, trio.MultiError) as err:
log.exception("Actor crashed:")
await _maybe_enter_pm(err)
raise
# XXX: the actor is cancelled when this context is complete
# given that there are no more active peer channels connected