First try: pack cancelled tracebacks and ship to caller

bi_streaming
Tyler Goodlet 2021-06-27 11:37:35 -04:00
parent 627f1076d6
commit 17fca76865
2 changed files with 45 additions and 17 deletions

View File

@ -60,6 +60,9 @@ async def _invoke(
""" """
treat_as_gen = False treat_as_gen = False
# possible a traceback (not sure what typing is for this..)
tb = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
cs: trio.CancelScope = None cs: trio.CancelScope = None
@ -156,14 +159,26 @@ async def _invoke(
ctx._scope_nursery = scope_nursery ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope cs = scope_nursery.cancel_scope
task_status.started(cs) task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except trio.Cancelled as err:
tb = err.__traceback__
if cs.cancelled_caught: if cs.cancelled_caught:
if ctx._cancel_called:
msg = f'{func.__name__} cancelled itself',
else: # TODO: pack in ``trio.Cancelled.__traceback__`` here
msg = f'{func.__name__} was remotely cancelled', # so they can be unwrapped and displayed on the caller
# side!
fname = func.__name__
if ctx._cancel_called:
msg = f'{fname} cancelled itself'
elif cs.cancel_called:
msg = (
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
# task-contex was cancelled so relay to the cancel to caller # task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled( raise ContextCancelled(
@ -196,7 +211,7 @@ async def _invoke(
log.exception("Actor crashed:") log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err, tb=tb)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
await chan.send(err_msg) await chan.send(err_msg)
@ -223,7 +238,7 @@ async def _invoke(
f"Task {func} likely errored or cancelled before it started") f"Task {func} likely errored or cancelled before it started")
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info("All RPC tasks have completed") log.runtime("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -238,10 +253,10 @@ _lifetime_stack: ExitStack = ExitStack()
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
An *actor* is the combination of a regular Python or An *actor* is the combination of a regular Python process
``multiprocessing.Process`` executing a ``trio`` task tree, communicating executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API with other actors through "portals" which provide a native async API
around "channels". around various IPC transport "channels".
""" """
is_arbiter: bool = False is_arbiter: bool = False
@ -396,7 +411,7 @@ class Actor:
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
try: try:
@ -427,8 +442,12 @@ class Actor:
event.set() event.set()
chans = self._peers[uid] chans = self._peers[uid]
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
if chans: if chans:
log.warning( log.runtime(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.trace(f"Registered {chan} for {uid}") # type: ignore log.trace(f"Registered {chan} for {uid}") # type: ignore
@ -672,7 +691,7 @@ class Actor:
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
log.info(f"RPC func is {func}") log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = ( self._rpc_tasks[(chan, cid)] = (
@ -681,7 +700,7 @@ class Actor:
# self.cancel() was called so kill this msg loop # self.cancel() was called so kill this msg loop
# and break out into ``_async_main()`` # and break out into ``_async_main()``
log.warning( log.warning(
f"{self.uid} was remotely cancelled; " f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..") "waiting on cancellation completion..")
await self._cancel_complete.wait() await self._cancel_complete.wait()
loop_cs.cancel() loop_cs.cancel()
@ -1149,7 +1168,7 @@ class Actor:
raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid

View File

@ -56,13 +56,22 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
def pack_error(exc: BaseException) -> Dict[str, Any]: def pack_error(
exc: BaseException,
tb = None,
) -> Dict[str, Any]:
"""Create an "error message" for tranmission over """Create an "error message" for tranmission over
a channel (aka the wire). a channel (aka the wire).
""" """
if tb:
tb_str = ''.join(traceback.format_tb(tb))
else:
tb_str = traceback.format_exc()
return { return {
'error': { 'error': {
'tb_str': traceback.format_exc(), 'tb_str': tb_str,
'type_str': type(exc).__name__, 'type_str': type(exc).__name__,
} }
} }