Set `._cancel_msg` to RPC `{cmd: 'self._cancel_task', ..}` msg

Like how we set `Context._cancel_msg` in `._deliver_msg()` (in
which case normally it's an `{'error': ..}` msg), do the same when any
RPC task is remotely cancelled via `Actor._cancel_task` where that task
doesn't yet have a cancel msg set yet.

This makes is much easier to distinguish between ctx cancellations due
to some remote error vs. Explicit remote requests via any of
`Actor.cancel()`, `Portal.cancel_actor()` or `Context.cancel()`.
modden_spawn_from_client_req
Tyler Goodlet 2024-03-07 18:24:00 -05:00
parent 7ae9b5319b
commit 364ea91983
1 changed files with 45 additions and 29 deletions

View File

@ -302,7 +302,7 @@ async def _errors_relayed_via_ipc(
) )
) )
): ):
# await pause() # await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this # => I can't think of a reason that inspecting this
@ -322,6 +322,12 @@ async def _errors_relayed_via_ipc(
cid=ctx.cid, cid=ctx.cid,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
if is_rpc: if is_rpc:
try: try:
await chan.send(err_msg) await chan.send(err_msg)
@ -566,6 +572,7 @@ async def _invoke(
# inside ._context._drain_to_final_msg()`.. # inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right? # # TODO: remove this ^ right?
if ctx._scope.cancelled_caught: if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error # first check for and raise any remote error
# before raising any context cancelled case # before raising any context cancelled case
@ -575,8 +582,9 @@ async def _invoke(
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( msg: str = (
'actor was cancelled by ' 'actor was cancelled by '
@ -632,15 +640,6 @@ async def _invoke(
# f' |_{ctx}' # f' |_{ctx}'
) )
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
# '------ - ------\n'
# 'IPC msg:\n'
f'\n\n{ctx._cancel_msg}'
)
# task-contex was either cancelled by request using # task-contex was either cancelled by request using
# ``Portal.cancel_actor()`` or ``Context.cancel()`` # ``Portal.cancel_actor()`` or ``Context.cancel()``
# on the far end, or it was cancelled by the local # on the far end, or it was cancelled by the local
@ -1753,7 +1752,9 @@ class Actor:
self, self,
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str] | None = None,
requesting_uid: tuple[str, str]|None = None,
ipc_msg: dict|None|bool = False,
) -> bool: ) -> bool:
''' '''
@ -1764,16 +1765,13 @@ class Actor:
in the signature (for now). in the signature (for now).
''' '''
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from # this ctx based lookup ensures the requested task to be
# this channel # cancelled was indeed spawned by a request from its
# parent (or some grandparent's) channel
ctx: Context ctx: Context
func: Callable func: Callable
is_complete: trio.Event is_complete: trio.Event
# NOTE: right now this is only implicitly called by
# streaming IPC but it should be called
# to cancel any remotely spawned task
try: try:
( (
ctx, ctx,
@ -1801,20 +1799,23 @@ class Actor:
log.cancel( log.cancel(
'Cancel request for RPC task\n\n' 'Cancel request for RPC task\n\n'
f'<= ._cancel_task(): {requesting_uid}\n' f'<= Actor.cancel_task(): {requesting_uid}\n\n'
f' |_ @{ctx.dmaddr}\n\n' f'=> {ctx._task}\n'
f' |_ >> {ctx.repr_rpc}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n'
# TODO: better ascii repr for "supervisor" like # TODO: better ascii repr for "supervisor" like
# a nursery or context scope? # a nursery or context scope?
# f'=> {parent_chan}\n' # f'=> {parent_chan}\n'
f'=> {ctx._task}\n' # f' |_{ctx._task}\n'
# TODO: simplified `Context.__repr__()` fields output # TODO: simplified `Context.__repr__()` fields output
# shows only application state-related stuff like, # shows only application state-related stuff like,
# - ._stream # - ._stream
# - .closed # - .closed
# - .started_called # - .started_called
# - .. etc. # - .. etc.
f' >> {ctx.repr_rpc}\n' # f' >> {ctx.repr_rpc}\n'
# f' |_ctx: {cid}\n' # f' |_ctx: {cid}\n'
# f' >> {ctx._nsf}()\n' # f' >> {ctx._nsf}()\n'
) )
@ -1824,6 +1825,16 @@ class Actor:
): ):
ctx._canceller: tuple = requesting_uid ctx._canceller: tuple = requesting_uid
# TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and
# then raise and pack it here?
if (
ipc_msg
and ctx._cancel_msg is None
):
# assign RPC msg directly from the loop which usually
# the case with `ctx.cancel()` on the other side.
ctx._cancel_msg = ipc_msg
# don't allow cancelling this function mid-execution # don't allow cancelling this function mid-execution
# (is this necessary?) # (is this necessary?)
if func is self._cancel_task: if func is self._cancel_task:
@ -1904,10 +1915,15 @@ class Actor:
else else
"IPC channel's " "IPC channel's "
) )
rent_chan_repr: str = (
f'|_{parent_chan}'
if parent_chan
else ''
)
log.cancel( log.cancel(
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
f'<= .cancel_rpc_tasks(): {req_uid}\n' f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
f' {rent_chan_repr}\n'
# f'{self}\n' # f'{self}\n'
# f'{tasks_str}' # f'{tasks_str}'
) )
@ -1927,9 +1943,6 @@ class Actor:
): ):
continue continue
# if func == self._cancel_task:
# continue
# TODO: this maybe block on the task cancellation # TODO: this maybe block on the task cancellation
# and so should really done in a nursery batch? # and so should really done in a nursery batch?
await self._cancel_task( await self._cancel_task(
@ -2339,6 +2352,8 @@ async def process_messages(
await actor._cancel_task( await actor._cancel_task(
cid, cid,
channel, channel,
ipc_msg=msg,
) )
break break
@ -2449,6 +2464,7 @@ async def process_messages(
# cancel it! # cancel it!
'parent_chan': chan, 'parent_chan': chan,
'requesting_uid': chan.uid, 'requesting_uid': chan.uid,
'ipc_msg': msg,
} }
# TODO: remove? already have emit in meth. # TODO: remove? already have emit in meth.
# log.runtime( # log.runtime(
@ -2737,7 +2753,7 @@ class Arbiter(Actor):
sockaddr: tuple[str, int] sockaddr: tuple[str, int]
for (aname, _), sockaddr in self._registry.items(): for (aname, _), sockaddr in self._registry.items():
log.info( log.runtime(
f'Actor mailbox info:\n' f'Actor mailbox info:\n'
f'aname: {aname}\n' f'aname: {aname}\n'
f'sockaddr: {sockaddr}\n' f'sockaddr: {sockaddr}\n'