First try: pack cancelled tracebacks and ship to caller

transport_hardening
Tyler Goodlet 2021-06-27 11:37:35 -04:00
parent 210dfdf70f
commit 9ddb636452
2 changed files with 57 additions and 24 deletions

View File

@ -12,7 +12,6 @@ import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType from types import ModuleType
import signal
import sys import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
@ -60,7 +59,11 @@ async def _invoke(
"""Invoke local func and deliver result(s) over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
treat_as_gen = False treat_as_gen = False
cs = None
# possibly a traceback object
# (not sure what typing is for this..)
tb = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, _cancel_scope=cancel_scope) ctx = Context(chan, cid, _cancel_scope=cancel_scope)
context = False context = False
@ -153,12 +156,30 @@ async def _invoke(
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
# side!
fname = func.__name__
if ctx._cancel_called:
msg = f'{fname} cancelled itself'
elif cs.cancel_called:
msg = (
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
# task-contex was cancelled so relay to the cancel to caller # task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled( raise ContextCancelled(
f'{func.__name__} cancelled itself', msg,
suberror_type=trio.Cancelled, suberror_type=trio.Cancelled,
) )
@ -187,7 +208,7 @@ async def _invoke(
log.exception("Actor crashed:") log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err, tb=tb)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
await chan.send(err_msg) await chan.send(err_msg)
@ -212,7 +233,7 @@ async def _invoke(
f"Task {func} likely errored or cancelled before it started") f"Task {func} likely errored or cancelled before it started")
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info("All RPC tasks have completed") log.runtime("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -227,10 +248,10 @@ _lifetime_stack: ExitStack = ExitStack()
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
An *actor* is the combination of a regular Python or An *actor* is the combination of a regular Python process
``multiprocessing.Process`` executing a ``trio`` task tree, communicating executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API with other actors through "portals" which provide a native async API
around "channels". around various IPC transport "channels".
""" """
is_arbiter: bool = False is_arbiter: bool = False
@ -382,7 +403,7 @@ class Actor:
""" """
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
try: try:
@ -413,10 +434,14 @@ class Actor:
event.set() event.set()
chans = self._peers[uid] chans = self._peers[uid]
# TODO: re-use channels for new connections instead of always
# new ones; will require changing all the discovery funcs
if chans: if chans:
log.warning( log.runtime(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.trace(f"Registered {chan} for {uid}") # type: ignore log.trace(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
@ -625,7 +650,7 @@ class Actor:
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
log.info(f"RPC func is {func}") log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = ( self._rpc_tasks[(chan, cid)] = (
@ -634,7 +659,7 @@ class Actor:
# self.cancel() was called so kill this msg loop # self.cancel() was called so kill this msg loop
# and break out into ``_async_main()`` # and break out into ``_async_main()``
log.warning( log.warning(
f"{self.uid} was remotely cancelled; " f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..") "waiting on cancellation completion..")
await self._cancel_complete.wait() await self._cancel_complete.wait()
loop_cs.cancel() loop_cs.cancel()
@ -652,14 +677,13 @@ class Actor:
except ( except (
TransportClosed, TransportClosed,
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError # trio.ClosedResourceError
): ):
# channels "breaking" is ok since we don't have a teardown # channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out # handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence # of the message loop and expect the surrounding
# to clean up. # caller's teardown sequence to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly") log.warning(f"Channel from {chan.uid} closed abruptly")
# raise
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
@ -727,7 +751,7 @@ class Actor:
# so just cash manually here since it's what our # so just cash manually here since it's what our
# internals expect. # internals expect.
address: Tuple[str, int] = value address: Tuple[str, int] = value
self._arb_addr = value self._arb_addr = address
else: else:
setattr(self, attr, value) setattr(self, attr, value)
@ -735,8 +759,8 @@ class Actor:
# Disable sigint handling in children if NOT running in # Disable sigint handling in children if NOT running in
# debug mode; we shouldn't need it thanks to our # debug mode; we shouldn't need it thanks to our
# cancellation machinery. # cancellation machinery.
if 'debug_mode' not in rvs: # if '_debug_mode' not in rvs:
signal.signal(signal.SIGINT, signal.SIG_IGN) # signal.signal(signal.SIGINT, signal.SIG_IGN)
return chan, accept_addr return chan, accept_addr
@ -1122,7 +1146,7 @@ class Actor:
raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid

View File

@ -53,13 +53,22 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
def pack_error(exc: BaseException) -> Dict[str, Any]: def pack_error(
exc: BaseException,
tb = None,
) -> Dict[str, Any]:
"""Create an "error message" for tranmission over """Create an "error message" for tranmission over
a channel (aka the wire). a channel (aka the wire).
""" """
if tb:
tb_str = ''.join(traceback.format_tb(tb))
else:
tb_str = traceback.format_exc()
return { return {
'error': { 'error': {
'tb_str': traceback.format_exc(), 'tb_str': tb_str,
'type_str': type(exc).__name__, 'type_str': type(exc).__name__,
} }
} }