(Event) more pedantic `.cancel_acked: bool` def

Changes the condition logic to be more strict and moves it to a private
`._is_self_cancelled() -> bool` predicate which can be used elsewhere
(instead of having almost similar duplicate checks all over the
place..) and allows taking in a specific `remote_error` just for
verification purposes (like for tests).

Main strictness distinctions are now:
- obvi that `.cancel_called` is set (this filters any
  `Portal.cancel_actor()` or other out-of-band RPC),
- the received `ContextCancelled` **must** have its `.canceller` set to
  this side's `Actor.uid` (indicating we are the requester).
- `.src_actor_uid` **must** be the same as the `.chan.uid` (so the error
  must have originated from the opposite side's task.
- `ContextCancelled.canceller` should be already set to the `.chan.uid`
  indicating we received the msg via the runtime calling
  `._deliver_msg()` -> `_maybe_cancel_and_set_remote_error()` which
  ensures the error is specifically destined for this ctx-task exactly
  the same as how `Actor._cancel_task()` sets it from an input
  `requesting_uid` arg.

In support of the above adjust some impl deats:
- add `Context._actor: Actor` which is set once in `mk_context()` to
  avoid issues (particularly in testing) where `current_actor()` raises
  after the root actor / runtime is already exited. Use `._actor.uid` in
  both `.cancel_acked` (obvi) and '_maybe_cancel_and_set_remote_error()`
  when deciding whether to call `._scope.cancel()`.
- always cast `.canceller` to `tuple` if not null.
- delegate `.cancel_acked` directly to new private predicate (obvi).
- always set `._canceller` from any `RemoteActorError.src_actor_uid` or
  failing over to the `.chan.uid` when a non-remote error (tho that
  shouldn't ever happen right?).
- more extensive doc-string for `.cancel()` detailing the new strictness
  rules about whether an eventual `.cancel_acked` might be set.

Also tossed in even more logging format tweaks by adding a
`type_only: bool` to `.repr_outcome()` as desired for simpler output in
the `state: <outcome-repr-here>` and `.repr_rpc()` sections of the
`.__str__()`.
modden_spawn_from_client_req
Tyler Goodlet 2024-03-07 20:35:43 -05:00
parent 364ea91983
commit fa7e37d6ed
1 changed files with 184 additions and 101 deletions

View File

@ -364,6 +364,9 @@ class Context:
''' '''
chan: Channel chan: Channel
cid: str # "context id", more or less a unique linked-task-pair id cid: str # "context id", more or less a unique linked-task-pair id
_actor: Actor
# the "feeder" channels for delivering message values to the # the "feeder" channels for delivering message values to the
# local task from the runtime's msg processing loop. # local task from the runtime's msg processing loop.
_recv_chan: trio.MemoryReceiveChannel _recv_chan: trio.MemoryReceiveChannel
@ -429,7 +432,7 @@ class Context:
# there's always going to be an "underlying reason" that any # there's always going to be an "underlying reason" that any
# context was closed due to either a remote side error or # context was closed due to either a remote side error or
# a call to `.cancel()` which triggers `ContextCancelled`. # a call to `.cancel()` which triggers `ContextCancelled`.
_cancel_msg: str | dict | None = None _cancel_msg: str|dict|None = None
# NOTE: this state var used by the runtime to determine if the # NOTE: this state var used by the runtime to determine if the
# `pdbp` REPL is allowed to engage on contexts terminated via # `pdbp` REPL is allowed to engage on contexts terminated via
@ -486,6 +489,13 @@ class Context:
f' {stream}\n' f' {stream}\n'
) )
outcome_str: str = self.repr_outcome(
show_error_fields=True
)
outcome_typ_str: str = self.repr_outcome(
type_only=True
)
return ( return (
f'<Context(\n' f'<Context(\n'
# f'\n' # f'\n'
@ -505,8 +515,16 @@ class Context:
# f' ---\n' # f' ---\n'
f'\n' f'\n'
# f' -----\n' # f' -----\n'
f' |_state: {self.repr_outcome()}\n' #
f' outcome{ds}{self.repr_outcome(show_error_fields=True)}\n' # TODO: better state `str`ids?
# -[ ] maybe map err-types to strs like 'cancelled',
# 'errored', 'streaming', 'started', .. etc.
# -[ ] as well as a final result wrapper like
# `outcome.Value`?
#
f' |_state: {outcome_typ_str}\n'
f' outcome{ds}{outcome_str}\n'
f' result{ds}{self._result}\n' f' result{ds}{self._result}\n'
f' cancel_called{ds}{self.cancel_called}\n' f' cancel_called{ds}{self.cancel_called}\n'
f' cancel_acked{ds}{self.cancel_acked}\n' f' cancel_acked{ds}{self.cancel_acked}\n'
@ -545,14 +563,46 @@ class Context:
return self._cancel_called return self._cancel_called
@property @property
def canceller(self) -> tuple[str, str] | None: def canceller(self) -> tuple[str, str]|None:
''' '''
``Actor.uid: tuple[str, str]`` of the (remote) ``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled. (side of the) context to also be cancelled.
''' '''
return self._canceller if canc := self._canceller:
return tuple(canc)
return None
def _is_self_cancelled(
self,
remote_error: Exception|None = None,
) -> bool:
if not self._cancel_called:
return False
re: BaseException|None = (
remote_error
or self._remote_error
)
if not re:
return False
if from_uid := re.src_actor_uid:
from_uid: tuple = tuple(from_uid)
our_uid: tuple = self._actor.uid
our_canceller = self.canceller
return bool(
isinstance(re, ContextCancelled)
and from_uid == self.chan.uid
and re.canceller == our_uid
and our_canceller == from_uid
)
@property @property
def cancel_acked(self) -> bool: def cancel_acked(self) -> bool:
@ -568,22 +618,7 @@ class Context:
equal to the uid of the calling task's actor. equal to the uid of the calling task's actor.
''' '''
portal: Portal|None = self._portal return self._is_self_cancelled()
if portal:
our_uid: tuple = portal.actor.uid
return bool(
self._cancel_called
and (re := self._remote_error)
and isinstance(re, ContextCancelled)
and (
re.canceller
==
self.canceller
==
our_uid
)
)
@property @property
def cancelled_caught(self) -> bool: def cancelled_caught(self) -> bool:
@ -762,30 +797,15 @@ class Context:
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
ctxc_src: tuple = error.canceller
whom: str = ( whom: str = (
'us' if ctxc_src == current_actor().uid 'us' if error.canceller == self._actor.uid
else 'peer' else 'peer'
) )
log.cancel( log.cancel(
f'IPC context cancelled by {whom}!\n\n' f'IPC context cancelled by {whom}!\n\n'
f'{error}' f'{error}'
) )
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
self._canceller = ctxc_src
if self._cancel_called:
# this is an expected cancel request response
# message and we **don't need to raise it** in the
# local cancel `._scope` since it will potentially
# override a real error. After this returns
# `.cancel_acked == True`.
return
else: else:
log.error( log.error(
@ -794,7 +814,23 @@ class Context:
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n' f'{pformat(self)}\n'
) )
self._canceller = self.chan.uid
# always record the cancelling actor's uid since its
# cancellation state is linked and we want to know
# which process was the cause / requester of the
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_actor_uid',
None,
)
self._canceller = (
maybe_error_src
or
# XXX: in the case we get a non-boxed error?
# -> wait but this should never happen right?
self.chan.uid
)
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
@ -803,6 +839,15 @@ class Context:
cs: trio.CancelScope = self._scope cs: trio.CancelScope = self._scope
if ( if (
cs cs
# XXX this is an expected cancel request response
# message and we **don't need to raise it** in the
# local cancel `._scope` since it will potentially
# override a real error. After this method returns
# if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set.
and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
): ):
@ -840,9 +885,13 @@ class Context:
) -> str: ) -> str:
# TODO: how to show the transport interchange fmt? # TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key # codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome(
show_error_fields=True,
type_only=True,
)
return ( return (
# f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:'
f'{self._nsf}() -> {self.repr_outcome()}:' f'{self._nsf}() -> {outcome_str}:'
) )
async def cancel( async def cancel(
@ -851,10 +900,32 @@ class Context:
) -> None: ) -> None:
''' '''
Cancel this inter-actor-task context. Cancel this inter-actor IPC context by requestng the
remote side's cancel-scope-linked `trio.Task` by calling
`._scope.cancel()` and delivering an `ContextCancelled`
ack msg in reponse.
Request that the far side cancel it's current linked context, Behaviour:
Timeout quickly in an attempt to sidestep 2-generals... ---------
- after the far end cancels, the `.cancel()` calling side
should receive a `ContextCancelled` with the
`.canceller: tuple` uid set to the current `Actor.uid`.
- timeout (quickly) on failure to rx this ACK error-msg in
an attempt to sidestep 2-generals when the transport
layer fails.
Note, that calling this method DOES NOT also necessarily
result in `Context._scope.cancel()` being called
**locally**!
=> That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`.
If the caller (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and
manage it as needed.
''' '''
side: str = self.side side: str = self.side
@ -976,7 +1047,7 @@ class Context:
``trio``'s cancellation system. ``trio``'s cancellation system.
''' '''
actor: Actor = current_actor() actor: Actor = self._actor
# If the surrounding context has been cancelled by some # If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately # task with a handle to THIS, we error here immediately
@ -1149,32 +1220,30 @@ class Context:
a cancellation (if any). a cancellation (if any).
''' '''
if (( our_uid: tuple = self.chan.uid
# NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# ``Context.cancel()``), we don't want to re-raise that # for "graceful cancellation" case:
# cancellation signal locally (would be akin to #
# a ``trio.Nursery`` nursery raising ``trio.Cancelled`` # Whenever a "side" of a context (a `trio.Task` running in
# whenever ``CancelScope.cancel()`` was called) and # an actor) **is** the side which requested ctx
# instead silently reap the expected cancellation # cancellation (likekly via ``Context.cancel()``), we
# "error"-msg-as-ack. In this case the `err: # **don't** want to re-raise any eventually received
# ContextCancelled` must have a `.canceller` set to the # `ContextCancelled` response locally (would be akin to
# uid of the requesting task's actor and we only do NOT # a `trio.Nursery` nursery raising `trio.Cancelled`
# raise that error locally if WE ARE THAT ACTOR which # whenever `CancelScope.cancel()` was called).
# requested the cancellation. #
# Instead, silently reap the remote delivered ctxc
# (`ContextCancelled`) as an expected
# error-msg-is-cancellation-ack IFF said
# `remote_error: ContextCancelled` has `.canceller`
# set to the `Actor.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc).
if (
not raise_ctxc_from_self_call not raise_ctxc_from_self_call
and isinstance(remote_error, ContextCancelled) and self._is_self_cancelled(remote_error)
and (
self._cancel_called
# or self.chan._cancel_called
# TODO: ^ should we have a special separate case
# for this ^ ?
)
and ( # one of,
(portal := self._portal)
and (our_uid := portal.actor.uid)
# TODO: ?potentially it is useful to emit certain # TODO: ?potentially it is useful to emit certain
# warning/cancel logs for the cases where the # warning/cancel logs for the cases where the
# cancellation is due to a lower level cancel # cancellation is due to a lower level cancel
@ -1182,12 +1251,11 @@ class Context:
# that case it's not actually this specific ctx that # that case it's not actually this specific ctx that
# made a `.cancel()` call, but it is the same # made a `.cancel()` call, but it is the same
# actor-process? # actor-process?
and tuple(remote_error.canceller) == our_uid # or self.chan._cancel_called
or self.chan._cancel_called # XXX: ^ should we have a special separate case
or self.canceller == our_uid # for this ^, NO right?
)
) or (
) or (
# NOTE: whenever this context is the cause of an # NOTE: whenever this context is the cause of an
# overrun on the remote side (aka we sent msgs too # overrun on the remote side (aka we sent msgs too
# fast that the remote task was overrun according # fast that the remote task was overrun according
@ -1204,7 +1272,6 @@ class Context:
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.msgdata['type_str'] == 'StreamOverrun' and remote_error.msgdata['type_str'] == 'StreamOverrun'
and tuple(remote_error.msgdata['sender']) == our_uid and tuple(remote_error.msgdata['sender']) == our_uid
)
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -1236,7 +1303,7 @@ class Context:
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(
self, self,
hide_tb: bool = True, hide_tb: bool = False,
) -> Any|Exception: ) -> Any|Exception:
''' '''
@ -1378,7 +1445,20 @@ class Context:
if error: if error:
return error return error
assert not self._cancel_msg if cancmsg := self._cancel_msg:
# NOTE: means we're prolly in the process of
# processing the cancellation caused by
# this msg (eg. logging from `Actor._cancel_task()`
# method after receiving a `Context.cancel()` RPC)
# though there shouldn't ever be a `._cancel_msg`
# without it eventually resulting in this property
# delivering a value!
log.debug(
'`Context._cancel_msg` is set but has not yet resolved to `.maybe_error`?\n\n'
f'{cancmsg}\n'
)
# assert not self._cancel_msg
return None return None
def _final_result_is_set(self) -> bool: def _final_result_is_set(self) -> bool:
@ -1411,6 +1491,7 @@ class Context:
def repr_outcome( def repr_outcome(
self, self,
show_error_fields: bool = False, show_error_fields: bool = False,
type_only: bool = False,
) -> str: ) -> str:
''' '''
@ -1420,6 +1501,9 @@ class Context:
''' '''
merr: Exception|None = self.maybe_error merr: Exception|None = self.maybe_error
if merr: if merr:
if type_only:
return type(merr).__name__
# if the error-type is one of ours and has the custom # if the error-type is one of ours and has the custom
# defined "repr-(in)-one-line" method call it, ow # defined "repr-(in)-one-line" method call it, ow
# just deliver the type name. # just deliver the type name.
@ -1616,8 +1700,6 @@ class Context:
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
# from .devx._debug import pause
# await pause()
# NOTE: if an error is deteced we should always still # NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect # send it through the feeder-mem-chan and expect
@ -1666,7 +1748,7 @@ class Context:
# overrun state and that msg isn't stuck in an # overrun state and that msg isn't stuck in an
# overflow queue what happens?!? # overflow queue what happens?!?
local_uid = current_actor().uid local_uid = self._actor.uid
txt: str = ( txt: str = (
'on IPC context:\n' 'on IPC context:\n'
@ -1765,6 +1847,7 @@ def mk_context(
ctx = Context( ctx = Context(
chan=chan, chan=chan,
cid=cid, cid=cid,
_actor=current_actor(),
_send_chan=send_chan, _send_chan=send_chan,
_recv_chan=recv_chan, _recv_chan=recv_chan,
_nsf=nsf, _nsf=nsf,