Add "root mailbox" contact info passing

Every subactor in the tree now receives the socket (or whatever the
mailbox type ends up being) during startup and can call the new
`tractor._discovery.get_root()` function to get a portal to the current
root actor in their tree. The main reason for adding this atm is to
support nested child actors gaining access to the root's tty lock for
debugging.

Also, when a channel disconnects from a message loop, might as well kill
all its rpc tasks.
debug_tests
Tyler Goodlet 2020-10-04 17:58:41 -04:00
parent e387e8b322
commit 83a45119e9
4 changed files with 47 additions and 15 deletions

View File

@ -539,7 +539,9 @@ class Actor:
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: else:
# channel disconnect # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(
f"{chan} from {chan.uid} disconnected, cancelling all rpc tasks")
self.cancel_rpc_tasks(chan)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
@ -676,6 +678,9 @@ class Actor:
accept_port=port accept_port=port
) )
) )
accept_addr = self.accept_addr
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
@ -686,7 +691,7 @@ class Actor:
'self', 'self',
'register_actor', 'register_actor',
uid=self.uid, uid=self.uid,
sockaddr=self.accept_addr, sockaddr=accept_addr,
) )
registered_with_arbiter = True registered_with_arbiter = True
@ -895,15 +900,23 @@ class Actor:
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n") f"peer: {chan.uid}\n")
async def cancel_rpc_tasks(self) -> None: async def cancel_rpc_tasks(
self,
only_chan: Optional[Channel] = None,
) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope """Cancel all existing RPC responder tasks using the cancel scope
registered for each. registered for each.
""" """
tasks = self._rpc_tasks tasks = self._rpc_tasks
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy(): for (chan, cid) in tasks.copy():
if only_chan is not None:
if only_chan != chan:
continue
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan) await self._cancel_task(cid, chan)
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
@ -1058,9 +1071,15 @@ async def _start_actor(
try: try:
result = await main() result = await main()
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
log.exception("Actor crashed:") try:
await _debug._maybe_enter_pm(err) log.exception("Actor crashed:")
raise await _debug._maybe_enter_pm(err)
raise
finally:
actor._service_n.cancel_scope.cancel()
await actor.cancel()
# XXX: the actor is cancelled when this context is complete # XXX: the actor is cancelled when this context is complete
# given that there are no more active peer channels connected # given that there are no more active peer channels connected

View File

@ -15,6 +15,7 @@ import trio
from .log import get_logger from .log import get_logger
from . import _state from . import _state
from ._discovery import get_root
try: try:
# wtf: only exported when installed in dev mode? # wtf: only exported when installed in dev mode?
@ -190,11 +191,7 @@ def _breakpoint(debug_func) -> Awaitable[None]:
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
_debugger_request_cs = cs _debugger_request_cs = cs
try: try:
async with tractor._portal.open_portal( async with get_root() as portal:
actor._parent_chan,
start_msg_loop=False,
# shield=True,
) as portal:
with trio.fail_after(.5): with trio.fail_after(.5):
agen = await portal.run( agen = await portal.run(
'tractor._debug', 'tractor._debug',
@ -262,9 +259,14 @@ def _breakpoint(debug_func) -> Awaitable[None]:
async def _lock( async def _lock(
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
async with _acquire_debug_lock(): try:
task_status.started() async with _acquire_debug_lock():
await do_unlock.wait() task_status.started()
await do_unlock.wait()
finally:
global _in_debug
_in_debug = False
log.debug(f"{actor} released tty lock")
await actor._service_n.start(_lock) await actor._service_n.start(_lock)

View File

@ -11,7 +11,7 @@ from ._portal import (
open_portal, open_portal,
LocalPortal, LocalPortal,
) )
from ._state import current_actor from ._state import current_actor, _runtime_vars
@asynccontextmanager @asynccontextmanager
@ -37,6 +37,16 @@ async def get_arbiter(
yield arb_portal yield arb_portal
@asynccontextmanager
async def get_root(
**kwargs,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
host, port = _runtime_vars['_root_mailbox']
async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal:
yield portal
@asynccontextmanager @asynccontextmanager
async def find_actor( async def find_actor(
name: str, name: str,

View File

@ -11,6 +11,7 @@ _current_actor: Optional['Actor'] = None # type: ignore
_runtime_vars = { _runtime_vars = {
'_debug_mode': False, '_debug_mode': False,
'_is_root': False, '_is_root': False,
'_root_mailbox': (None, None)
} }