diff --git a/tractor/_actor.py b/tractor/_actor.py index 6517ed3..b79106f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -539,7 +539,9 @@ class Actor: f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") + log.debug( + f"{chan} from {chan.uid} disconnected, cancelling all rpc tasks") + self.cancel_rpc_tasks(chan) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -676,6 +678,9 @@ class Actor: accept_port=port ) ) + accept_addr = self.accept_addr + if _state._runtime_vars['_is_root']: + _state._runtime_vars['_root_mailbox'] = accept_addr # Register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") @@ -686,7 +691,7 @@ class Actor: 'self', 'register_actor', uid=self.uid, - sockaddr=self.accept_addr, + sockaddr=accept_addr, ) registered_with_arbiter = True @@ -895,15 +900,23 @@ class Actor: f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") - async def cancel_rpc_tasks(self) -> None: + async def cancel_rpc_tasks( + self, + only_chan: Optional[Channel] = None, + ) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ tasks = self._rpc_tasks log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): + if only_chan is not None: + if only_chan != chan: + continue + # TODO: this should really done in a nursery batch await self._cancel_task(cid, chan) + log.info( f"Waiting for remaining rpc tasks to complete {tasks}") await self._ongoing_rpc_tasks.wait() @@ -1058,9 +1071,15 @@ async def _start_actor( try: result = await main() except (Exception, trio.MultiError) as err: - log.exception("Actor crashed:") - await _debug._maybe_enter_pm(err) - raise + try: + log.exception("Actor crashed:") + await _debug._maybe_enter_pm(err) + + raise + + finally: + actor._service_n.cancel_scope.cancel() + await actor.cancel() # XXX: the actor is cancelled when this context is complete # given that there are no more active peer channels connected diff --git a/tractor/_debug.py b/tractor/_debug.py index a0a5fc6..2e1c107 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -15,6 +15,7 @@ import trio from .log import get_logger from . import _state +from ._discovery import get_root try: # wtf: only exported when installed in dev mode? @@ -190,11 +191,7 @@ def _breakpoint(debug_func) -> Awaitable[None]: with trio.CancelScope() as cs: _debugger_request_cs = cs try: - async with tractor._portal.open_portal( - actor._parent_chan, - start_msg_loop=False, - # shield=True, - ) as portal: + async with get_root() as portal: with trio.fail_after(.5): agen = await portal.run( 'tractor._debug', @@ -262,9 +259,14 @@ def _breakpoint(debug_func) -> Awaitable[None]: async def _lock( task_status=trio.TASK_STATUS_IGNORED ): - async with _acquire_debug_lock(): - task_status.started() - await do_unlock.wait() + try: + async with _acquire_debug_lock(): + task_status.started() + await do_unlock.wait() + finally: + global _in_debug + _in_debug = False + log.debug(f"{actor} released tty lock") await actor._service_n.start(_lock) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 0407896..13da85d 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -11,7 +11,7 @@ from ._portal import ( open_portal, LocalPortal, ) -from ._state import current_actor +from ._state import current_actor, _runtime_vars @asynccontextmanager @@ -37,6 +37,16 @@ async def get_arbiter( yield arb_portal +@asynccontextmanager +async def get_root( +**kwargs, +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + host, port = _runtime_vars['_root_mailbox'] + async with _connect_chan(host, port) as chan: + async with open_portal(chan, **kwargs) as portal: + yield portal + + @asynccontextmanager async def find_actor( name: str, diff --git a/tractor/_state.py b/tractor/_state.py index 2a18a4c..2728b78 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -11,6 +11,7 @@ _current_actor: Optional['Actor'] = None # type: ignore _runtime_vars = { '_debug_mode': False, '_is_root': False, + '_root_mailbox': (None, None) }