From a602de02a98fd4df57365a4c5cfb2953550d28f8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jun 2021 11:37:35 -0400 Subject: [PATCH] First try: pack cancelled tracebacks and ship to caller --- tractor/_actor.py | 58 +++++++++++++++++++++++++++--------------- tractor/_exceptions.py | 13 ++++++++-- 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index d80d57b..8589358 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -60,6 +60,9 @@ async def _invoke( """ treat_as_gen = False + # possible a traceback (not sure what typing is for this..) + tb = None + cancel_scope = trio.CancelScope() cs: trio.CancelScope = None @@ -156,14 +159,26 @@ async def _invoke( ctx._scope_nursery = scope_nursery cs = scope_nursery.cancel_scope task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + try: + await chan.send({'return': await coro, 'cid': cid}) + except trio.Cancelled as err: + tb = err.__traceback__ if cs.cancelled_caught: - if ctx._cancel_called: - msg = f'{func.__name__} cancelled itself', - else: - msg = f'{func.__name__} was remotely cancelled', + # TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side! + + fname = func.__name__ + if ctx._cancel_called: + msg = f'{fname} cancelled itself' + + elif cs.cancel_called: + msg = ( + f'{fname} was remotely cancelled by its caller ' + f'{ctx.chan.uid}' + ) # task-contex was cancelled so relay to the cancel to caller raise ContextCancelled( @@ -209,7 +224,7 @@ async def _invoke( log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err) + err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid try: await chan.send(err_msg) @@ -236,7 +251,7 @@ async def _invoke( f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: - log.info("All RPC tasks have completed") + log.runtime("All RPC tasks have completed") actor._ongoing_rpc_tasks.set() @@ -251,10 +266,10 @@ _lifetime_stack: ExitStack = ExitStack() class Actor: """The fundamental concurrency primitive. - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + An *actor* is the combination of a regular Python process + executing a ``trio`` task tree, communicating with other actors through "portals" which provide a native async API - around "channels". + around various IPC transport "channels". """ is_arbiter: bool = False @@ -409,7 +424,7 @@ class Actor: self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") + log.runtime(f"New connection to us {chan}") # send/receive initial handshake response try: @@ -440,8 +455,12 @@ class Actor: event.set() chans = self._peers[uid] + + # TODO: re-use channels for new connections instead + # of always new ones; will require changing all the + # discovery funcs if chans: - log.warning( + log.runtime( f"already have channel(s) for {uid}:{chans}?" ) @@ -686,7 +705,7 @@ class Actor: else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() - log.info(f"RPC func is {func}") + log.runtime(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( @@ -695,7 +714,7 @@ class Actor: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` log.warning( - f"{self.uid} was remotely cancelled; " + f"Actor {self.uid} was remotely cancelled; " "waiting on cancellation completion..") await self._cancel_complete.wait() loop_cs.cancel() @@ -713,14 +732,13 @@ class Actor: except ( TransportClosed, trio.BrokenResourceError, - trio.ClosedResourceError + # trio.ClosedResourceError ): # channels "breaking" is ok since we don't have a teardown # handshake for them (yet) and instead we simply bail out - # of the message loop and expect the teardown sequence - # to clean up. - log.error(f"{chan} form {chan.uid} closed abruptly") - # raise + # of the message loop and expect the surrounding + # caller's teardown sequence to clean up. + log.warning(f"Channel from {chan.uid} closed abruptly") except (Exception, trio.MultiError) as err: # ship any "internal" exception (i.e. one from internal machinery @@ -1164,7 +1182,7 @@ class Actor: raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index d213a20..30c872b 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -56,13 +56,22 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" -def pack_error(exc: BaseException) -> Dict[str, Any]: +def pack_error( + exc: BaseException, + tb = None, + +) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). """ + if tb: + tb_str = ''.join(traceback.format_tb(tb)) + else: + tb_str = traceback.format_exc() + return { 'error': { - 'tb_str': traceback.format_exc(), + 'tb_str': tb_str, 'type_str': type(exc).__name__, } }