From 83a45119e9e6d103c57dedc6af225b9a9968dbca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 4 Oct 2020 17:58:41 -0400 Subject: [PATCH] Add "root mailbox" contact info passing Every subactor in the tree now receives the socket (or whatever the mailbox type ends up being) during startup and can call the new `tractor._discovery.get_root()` function to get a portal to the current root actor in their tree. The main reason for adding this atm is to support nested child actors gaining access to the root's tty lock for debugging. Also, when a channel disconnects from a message loop, might as well kill all its rpc tasks. --- tractor/_actor.py | 31 +++++++++++++++++++++++++------ tractor/_debug.py | 18 ++++++++++-------- tractor/_discovery.py | 12 +++++++++++- tractor/_state.py | 1 + 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 6517ed3..b79106f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -539,7 +539,9 @@ class Actor: f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") + log.debug( + f"{chan} from {chan.uid} disconnected, cancelling all rpc tasks") + self.cancel_rpc_tasks(chan) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -676,6 +678,9 @@ class Actor: accept_port=port ) ) + accept_addr = self.accept_addr + if _state._runtime_vars['_is_root']: + _state._runtime_vars['_root_mailbox'] = accept_addr # Register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") @@ -686,7 +691,7 @@ class Actor: 'self', 'register_actor', uid=self.uid, - sockaddr=self.accept_addr, + sockaddr=accept_addr, ) registered_with_arbiter = True @@ -895,15 +900,23 @@ class Actor: f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") - async def cancel_rpc_tasks(self) -> None: + async def cancel_rpc_tasks( + self, + only_chan: Optional[Channel] = None, + ) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ tasks = self._rpc_tasks log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): + if only_chan is not None: + if only_chan != chan: + continue + # TODO: this should really done in a nursery batch await self._cancel_task(cid, chan) + log.info( f"Waiting for remaining rpc tasks to complete {tasks}") await self._ongoing_rpc_tasks.wait() @@ -1058,9 +1071,15 @@ async def _start_actor( try: result = await main() except (Exception, trio.MultiError) as err: - log.exception("Actor crashed:") - await _debug._maybe_enter_pm(err) - raise + try: + log.exception("Actor crashed:") + await _debug._maybe_enter_pm(err) + + raise + + finally: + actor._service_n.cancel_scope.cancel() + await actor.cancel() # XXX: the actor is cancelled when this context is complete # given that there are no more active peer channels connected diff --git a/tractor/_debug.py b/tractor/_debug.py index a0a5fc6..2e1c107 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -15,6 +15,7 @@ import trio from .log import get_logger from . import _state +from ._discovery import get_root try: # wtf: only exported when installed in dev mode? @@ -190,11 +191,7 @@ def _breakpoint(debug_func) -> Awaitable[None]: with trio.CancelScope() as cs: _debugger_request_cs = cs try: - async with tractor._portal.open_portal( - actor._parent_chan, - start_msg_loop=False, - # shield=True, - ) as portal: + async with get_root() as portal: with trio.fail_after(.5): agen = await portal.run( 'tractor._debug', @@ -262,9 +259,14 @@ def _breakpoint(debug_func) -> Awaitable[None]: async def _lock( task_status=trio.TASK_STATUS_IGNORED ): - async with _acquire_debug_lock(): - task_status.started() - await do_unlock.wait() + try: + async with _acquire_debug_lock(): + task_status.started() + await do_unlock.wait() + finally: + global _in_debug + _in_debug = False + log.debug(f"{actor} released tty lock") await actor._service_n.start(_lock) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 0407896..13da85d 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -11,7 +11,7 @@ from ._portal import ( open_portal, LocalPortal, ) -from ._state import current_actor +from ._state import current_actor, _runtime_vars @asynccontextmanager @@ -37,6 +37,16 @@ async def get_arbiter( yield arb_portal +@asynccontextmanager +async def get_root( +**kwargs, +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + host, port = _runtime_vars['_root_mailbox'] + async with _connect_chan(host, port) as chan: + async with open_portal(chan, **kwargs) as portal: + yield portal + + @asynccontextmanager async def find_actor( name: str, diff --git a/tractor/_state.py b/tractor/_state.py index 2a18a4c..2728b78 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -11,6 +11,7 @@ _current_actor: Optional['Actor'] = None # type: ignore _runtime_vars = { '_debug_mode': False, '_is_root': False, + '_root_mailbox': (None, None) }