diff --git a/tractor/_actor.py b/tractor/_actor.py index b6aaf85..638cbb2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -60,6 +60,9 @@ async def _invoke( """ treat_as_gen = False + # possible a traceback (not sure what typing is for this..) + tb = None + cancel_scope = trio.CancelScope() cs: trio.CancelScope = None @@ -156,14 +159,26 @@ async def _invoke( ctx._scope_nursery = scope_nursery cs = scope_nursery.cancel_scope task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + try: + await chan.send({'return': await coro, 'cid': cid}) + except trio.Cancelled as err: + tb = err.__traceback__ if cs.cancelled_caught: - if ctx._cancel_called: - msg = f'{func.__name__} cancelled itself', - else: - msg = f'{func.__name__} was remotely cancelled', + # TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side! + + fname = func.__name__ + if ctx._cancel_called: + msg = f'{fname} cancelled itself' + + elif cs.cancel_called: + msg = ( + f'{fname} was remotely cancelled by its caller ' + f'{ctx.chan.uid}' + ) # task-contex was cancelled so relay to the cancel to caller raise ContextCancelled( @@ -196,7 +211,7 @@ async def _invoke( log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err) + err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid try: await chan.send(err_msg) @@ -223,7 +238,7 @@ async def _invoke( f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: - log.info("All RPC tasks have completed") + log.runtime("All RPC tasks have completed") actor._ongoing_rpc_tasks.set() @@ -238,10 +253,10 @@ _lifetime_stack: ExitStack = ExitStack() class Actor: """The fundamental concurrency primitive. - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + An *actor* is the combination of a regular Python process + executing a ``trio`` task tree, communicating with other actors through "portals" which provide a native async API - around "channels". + around various IPC transport "channels". """ is_arbiter: bool = False @@ -396,7 +411,7 @@ class Actor: self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") + log.runtime(f"New connection to us {chan}") # send/receive initial handshake response try: @@ -427,8 +442,12 @@ class Actor: event.set() chans = self._peers[uid] + + # TODO: re-use channels for new connections instead + # of always new ones; will require changing all the + # discovery funcs if chans: - log.warning( + log.runtime( f"already have channel(s) for {uid}:{chans}?" ) log.trace(f"Registered {chan} for {uid}") # type: ignore @@ -672,7 +691,7 @@ class Actor: else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() - log.info(f"RPC func is {func}") + log.runtime(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( @@ -681,7 +700,7 @@ class Actor: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` log.warning( - f"{self.uid} was remotely cancelled; " + f"Actor {self.uid} was remotely cancelled; " "waiting on cancellation completion..") await self._cancel_complete.wait() loop_cs.cancel() @@ -699,14 +718,13 @@ class Actor: except ( TransportClosed, trio.BrokenResourceError, - trio.ClosedResourceError + # trio.ClosedResourceError ): # channels "breaking" is ok since we don't have a teardown # handshake for them (yet) and instead we simply bail out - # of the message loop and expect the teardown sequence - # to clean up. - log.error(f"{chan} form {chan.uid} closed abruptly") - # raise + # of the message loop and expect the surrounding + # caller's teardown sequence to clean up. + log.warning(f"Channel from {chan.uid} closed abruptly") except (Exception, trio.MultiError) as err: # ship any "internal" exception (i.e. one from internal machinery @@ -1150,7 +1168,7 @@ class Actor: raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index d213a20..30c872b 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -56,13 +56,22 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" -def pack_error(exc: BaseException) -> Dict[str, Any]: +def pack_error( + exc: BaseException, + tb = None, + +) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). """ + if tb: + tb_str = ''.join(traceback.format_tb(tb)) + else: + tb_str = traceback.format_exc() + return { 'error': { - 'tb_str': traceback.format_exc(), + 'tb_str': tb_str, 'type_str': type(exc).__name__, } }