diff --git a/tractor/_actor.py b/tractor/_actor.py index c5f896b..f2e1ffb 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -12,7 +12,6 @@ import uuid import typing from typing import Dict, List, Tuple, Any, Optional, Union from types import ModuleType -import signal import sys import os from contextlib import ExitStack @@ -60,7 +59,11 @@ async def _invoke( """Invoke local func and deliver result(s) over provided channel. """ treat_as_gen = False - cs = None + + # possibly a traceback object + # (not sure what typing is for this..) + tb = None + cancel_scope = trio.CancelScope() ctx = Context(chan, cid, _cancel_scope=cancel_scope) context = False @@ -153,12 +156,30 @@ async def _invoke( with cancel_scope as cs: task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + try: + await chan.send({'return': await coro, 'cid': cid}) + except trio.Cancelled as err: + tb = err.__traceback__ if cs.cancelled_caught: + + # TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side! + + fname = func.__name__ + if ctx._cancel_called: + msg = f'{fname} cancelled itself' + + elif cs.cancel_called: + msg = ( + f'{fname} was remotely cancelled by its caller ' + f'{ctx.chan.uid}' + ) + # task-contex was cancelled so relay to the cancel to caller raise ContextCancelled( - f'{func.__name__} cancelled itself', + msg, suberror_type=trio.Cancelled, ) @@ -187,7 +208,7 @@ async def _invoke( log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err) + err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid try: await chan.send(err_msg) @@ -212,7 +233,7 @@ async def _invoke( f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: - log.info("All RPC tasks have completed") + log.runtime("All RPC tasks have completed") actor._ongoing_rpc_tasks.set() @@ -227,10 +248,10 @@ _lifetime_stack: ExitStack = ExitStack() class Actor: """The fundamental concurrency primitive. - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + An *actor* is the combination of a regular Python process + executing a ``trio`` task tree, communicating with other actors through "portals" which provide a native async API - around "channels". + around various IPC transport "channels". """ is_arbiter: bool = False @@ -382,7 +403,7 @@ class Actor: """ self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") + log.runtime(f"New connection to us {chan}") # send/receive initial handshake response try: @@ -413,10 +434,14 @@ class Actor: event.set() chans = self._peers[uid] + + # TODO: re-use channels for new connections instead of always + # new ones; will require changing all the discovery funcs if chans: - log.warning( + log.runtime( f"already have channel(s) for {uid}:{chans}?" ) + log.trace(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -625,7 +650,7 @@ class Actor: else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() - log.info(f"RPC func is {func}") + log.runtime(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( @@ -634,7 +659,7 @@ class Actor: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` log.warning( - f"{self.uid} was remotely cancelled; " + f"Actor {self.uid} was remotely cancelled; " "waiting on cancellation completion..") await self._cancel_complete.wait() loop_cs.cancel() @@ -652,14 +677,13 @@ class Actor: except ( TransportClosed, trio.BrokenResourceError, - trio.ClosedResourceError + # trio.ClosedResourceError ): # channels "breaking" is ok since we don't have a teardown # handshake for them (yet) and instead we simply bail out - # of the message loop and expect the teardown sequence - # to clean up. - log.error(f"{chan} form {chan.uid} closed abruptly") - # raise + # of the message loop and expect the surrounding + # caller's teardown sequence to clean up. + log.warning(f"Channel from {chan.uid} closed abruptly") except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -727,7 +751,7 @@ class Actor: # so just cash manually here since it's what our # internals expect. address: Tuple[str, int] = value - self._arb_addr = value + self._arb_addr = address else: setattr(self, attr, value) @@ -735,8 +759,8 @@ class Actor: # Disable sigint handling in children if NOT running in # debug mode; we shouldn't need it thanks to our # cancellation machinery. - if 'debug_mode' not in rvs: - signal.signal(signal.SIGINT, signal.SIG_IGN) + # if '_debug_mode' not in rvs: + # signal.signal(signal.SIGINT, signal.SIG_IGN) return chan, accept_addr @@ -1122,7 +1146,7 @@ class Actor: raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 3751dad..8c603f0 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -53,13 +53,22 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" -def pack_error(exc: BaseException) -> Dict[str, Any]: +def pack_error( + exc: BaseException, + tb = None, + +) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). """ + if tb: + tb_str = ''.join(traceback.format_tb(tb)) + else: + tb_str = traceback.format_exc() + return { 'error': { - 'tb_str': traceback.format_exc(), + 'tb_str': tb_str, 'type_str': type(exc).__name__, } }