forked from goodboy/tractor
1
0
Fork 0

Change a bunch of log levels to cancel, including any `ContextCancelled` handling

ctx_cancel_semantics_and_overruns
Tyler Goodlet 2023-04-07 16:07:26 -04:00
parent b3f9251eda
commit e80e0a551f
2 changed files with 41 additions and 12 deletions

View File

@ -132,7 +132,7 @@ def _trio_main(
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")
log.cancel(f"Actor {actor.uid} received KBI")
finally:
log.info(f"Actor {actor.uid} terminated")

View File

@ -221,6 +221,8 @@ async def _invoke(
assert cs
if cs.cancelled_caught:
# if 'brokerd.kraken' in actor.uid:
# await _debug.breakpoint()
# TODO: pack in ``trio.Cancelled.__traceback__`` here
# so they can be unwrapped and displayed on the caller
@ -228,11 +230,11 @@ async def _invoke(
fname = func.__name__
if ctx._cancel_called:
msg = f'`{fname}()` cancelled itself'
msg = f'`{fname}()`@{actor.uid} cancelled itself'
elif cs.cancel_called:
msg = (
f'`{fname}()` was remotely cancelled by its caller '
f'`{fname}()`@{actor.uid} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
@ -910,9 +912,15 @@ class Actor:
'''
cid = str(uuid.uuid4())
assert chan.uid
ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size)
ctx = self.get_context(
chan,
cid,
msg_buffer_size=msg_buffer_size,
)
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
await chan.send(
{'cmd': (ns, func, kwargs, self.uid, cid)}
)
# Wait on first response msg and validate; this should be
# immediate.
@ -922,7 +930,11 @@ class Actor:
if 'error' in first_msg:
raise unpack_error(first_msg, chan)
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
elif functype not in (
'asyncfunc',
'asyncgen',
'context',
):
raise ValueError(f"{first_msg} is an invalid response packet?")
ctx._remote_func_type = functype
@ -1130,7 +1142,7 @@ class Actor:
async def cancel_rpc_tasks(
self,
only_chan: Optional[Channel] = None,
only_chan: Channel | None = None,
) -> None:
'''
Cancel all existing RPC responder tasks using the cancel scope
@ -1331,13 +1343,15 @@ async def async_main(
)
)
log.runtime("Waiting on service nursery to complete")
log.runtime("Service nursery complete")
log.runtime("Waiting on root nursery to complete")
log.runtime(
"Service nursery complete\n"
"Waiting on root nursery to complete"
)
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
log.info("Closing all actor lifetime contexts")
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not registered_with_arbiter:
@ -1358,6 +1372,13 @@ async def async_main(
await try_ship_error_to_parent(actor._parent_chan, err)
# always!
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(err)'
)
case _:
log.exception("Actor errored:")
raise
@ -1631,7 +1652,15 @@ async def process_messages(
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(err)'
)
case _:
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)