From fa7e37d6edaec9afe3bd49cc2014479149a37ebb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Mar 2024 20:35:43 -0500 Subject: [PATCH] (Event) more pedantic `.cancel_acked: bool` def Changes the condition logic to be more strict and moves it to a private `._is_self_cancelled() -> bool` predicate which can be used elsewhere (instead of having almost similar duplicate checks all over the place..) and allows taking in a specific `remote_error` just for verification purposes (like for tests). Main strictness distinctions are now: - obvi that `.cancel_called` is set (this filters any `Portal.cancel_actor()` or other out-of-band RPC), - the received `ContextCancelled` **must** have its `.canceller` set to this side's `Actor.uid` (indicating we are the requester). - `.src_actor_uid` **must** be the same as the `.chan.uid` (so the error must have originated from the opposite side's task. - `ContextCancelled.canceller` should be already set to the `.chan.uid` indicating we received the msg via the runtime calling `._deliver_msg()` -> `_maybe_cancel_and_set_remote_error()` which ensures the error is specifically destined for this ctx-task exactly the same as how `Actor._cancel_task()` sets it from an input `requesting_uid` arg. In support of the above adjust some impl deats: - add `Context._actor: Actor` which is set once in `mk_context()` to avoid issues (particularly in testing) where `current_actor()` raises after the root actor / runtime is already exited. Use `._actor.uid` in both `.cancel_acked` (obvi) and '_maybe_cancel_and_set_remote_error()` when deciding whether to call `._scope.cancel()`. - always cast `.canceller` to `tuple` if not null. - delegate `.cancel_acked` directly to new private predicate (obvi). - always set `._canceller` from any `RemoteActorError.src_actor_uid` or failing over to the `.chan.uid` when a non-remote error (tho that shouldn't ever happen right?). - more extensive doc-string for `.cancel()` detailing the new strictness rules about whether an eventual `.cancel_acked` might be set. Also tossed in even more logging format tweaks by adding a `type_only: bool` to `.repr_outcome()` as desired for simpler output in the `state: ` and `.repr_rpc()` sections of the `.__str__()`. --- tractor/_context.py | 285 ++++++++++++++++++++++++++++---------------- 1 file changed, 184 insertions(+), 101 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index f8aaf1c..9179456 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -364,6 +364,9 @@ class Context: ''' chan: Channel cid: str # "context id", more or less a unique linked-task-pair id + + _actor: Actor + # the "feeder" channels for delivering message values to the # local task from the runtime's msg processing loop. _recv_chan: trio.MemoryReceiveChannel @@ -429,7 +432,7 @@ class Context: # there's always going to be an "underlying reason" that any # context was closed due to either a remote side error or # a call to `.cancel()` which triggers `ContextCancelled`. - _cancel_msg: str | dict | None = None + _cancel_msg: str|dict|None = None # NOTE: this state var used by the runtime to determine if the # `pdbp` REPL is allowed to engage on contexts terminated via @@ -486,6 +489,13 @@ class Context: f' {stream}\n' ) + outcome_str: str = self.repr_outcome( + show_error_fields=True + ) + outcome_typ_str: str = self.repr_outcome( + type_only=True + ) + return ( f' tuple[str, str] | None: + def canceller(self) -> tuple[str, str]|None: ''' ``Actor.uid: tuple[str, str]`` of the (remote) actor-process who's task was cancelled thus causing this (side of the) context to also be cancelled. ''' - return self._canceller + if canc := self._canceller: + return tuple(canc) + + return None + + def _is_self_cancelled( + self, + remote_error: Exception|None = None, + + ) -> bool: + + if not self._cancel_called: + return False + + re: BaseException|None = ( + remote_error + or self._remote_error + ) + if not re: + return False + + if from_uid := re.src_actor_uid: + from_uid: tuple = tuple(from_uid) + + our_uid: tuple = self._actor.uid + our_canceller = self.canceller + + return bool( + isinstance(re, ContextCancelled) + and from_uid == self.chan.uid + and re.canceller == our_uid + and our_canceller == from_uid + ) @property def cancel_acked(self) -> bool: @@ -568,22 +618,7 @@ class Context: equal to the uid of the calling task's actor. ''' - portal: Portal|None = self._portal - if portal: - our_uid: tuple = portal.actor.uid - - return bool( - self._cancel_called - and (re := self._remote_error) - and isinstance(re, ContextCancelled) - and ( - re.canceller - == - self.canceller - == - our_uid - ) - ) + return self._is_self_cancelled() @property def cancelled_caught(self) -> bool: @@ -762,30 +797,15 @@ class Context: # self-cancel (ack) or, # peer propagated remote cancellation. if isinstance(error, ContextCancelled): - ctxc_src: tuple = error.canceller whom: str = ( - 'us' if ctxc_src == current_actor().uid + 'us' if error.canceller == self._actor.uid else 'peer' ) log.cancel( f'IPC context cancelled by {whom}!\n\n' f'{error}' ) - # always record the cancelling actor's uid since its - # cancellation state is linked and we want to know - # which process was the cause / requester of the - # cancellation. - self._canceller = ctxc_src - - - if self._cancel_called: - # this is an expected cancel request response - # message and we **don't need to raise it** in the - # local cancel `._scope` since it will potentially - # override a real error. After this returns - # `.cancel_acked == True`. - return else: log.error( @@ -794,7 +814,23 @@ class Context: f'{error}\n' f'{pformat(self)}\n' ) - self._canceller = self.chan.uid + + # always record the cancelling actor's uid since its + # cancellation state is linked and we want to know + # which process was the cause / requester of the + # cancellation. + maybe_error_src: tuple = getattr( + error, + 'src_actor_uid', + None, + ) + self._canceller = ( + maybe_error_src + or + # XXX: in the case we get a non-boxed error? + # -> wait but this should never happen right? + self.chan.uid + ) # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error @@ -803,6 +839,15 @@ class Context: cs: trio.CancelScope = self._scope if ( cs + + # XXX this is an expected cancel request response + # message and we **don't need to raise it** in the + # local cancel `._scope` since it will potentially + # override a real error. After this method returns + # if `._cancel_called` then `.cancel_acked and .cancel_called` + # always should be set. + and not self._is_self_cancelled() + and not cs.cancel_called and not cs.cancelled_caught ): @@ -840,9 +885,13 @@ class Context: ) -> str: # TODO: how to show the transport interchange fmt? # codec: str = self.chan.transport.codec_key + outcome_str: str = self.repr_outcome( + show_error_fields=True, + type_only=True, + ) return ( # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' - f'{self._nsf}() -> {self.repr_outcome()}:' + f'{self._nsf}() -> {outcome_str}:' ) async def cancel( @@ -851,10 +900,32 @@ class Context: ) -> None: ''' - Cancel this inter-actor-task context. + Cancel this inter-actor IPC context by requestng the + remote side's cancel-scope-linked `trio.Task` by calling + `._scope.cancel()` and delivering an `ContextCancelled` + ack msg in reponse. - Request that the far side cancel it's current linked context, - Timeout quickly in an attempt to sidestep 2-generals... + Behaviour: + --------- + - after the far end cancels, the `.cancel()` calling side + should receive a `ContextCancelled` with the + `.canceller: tuple` uid set to the current `Actor.uid`. + + - timeout (quickly) on failure to rx this ACK error-msg in + an attempt to sidestep 2-generals when the transport + layer fails. + + Note, that calling this method DOES NOT also necessarily + result in `Context._scope.cancel()` being called + **locally**! + + => That is, an IPC `Context` (this) **does not** + have the same semantics as a `trio.CancelScope`. + + If the caller (who entered the `Portal.open_context()`) + desires that the internal block's cancel-scope be + cancelled it should open its own `trio.CancelScope` and + manage it as needed. ''' side: str = self.side @@ -976,7 +1047,7 @@ class Context: ``trio``'s cancellation system. ''' - actor: Actor = current_actor() + actor: Actor = self._actor # If the surrounding context has been cancelled by some # task with a handle to THIS, we error here immediately @@ -1149,62 +1220,58 @@ class Context: a cancellation (if any). ''' - if (( - # NOTE: whenever the context's "opener" side (task) **is** - # the side which requested the cancellation (likekly via - # ``Context.cancel()``), we don't want to re-raise that - # cancellation signal locally (would be akin to - # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` - # whenever ``CancelScope.cancel()`` was called) and - # instead silently reap the expected cancellation - # "error"-msg-as-ack. In this case the `err: - # ContextCancelled` must have a `.canceller` set to the - # uid of the requesting task's actor and we only do NOT - # raise that error locally if WE ARE THAT ACTOR which - # requested the cancellation. - not raise_ctxc_from_self_call - and isinstance(remote_error, ContextCancelled) - and ( - self._cancel_called + our_uid: tuple = self.chan.uid - # or self.chan._cancel_called - # TODO: ^ should we have a special separate case - # for this ^ ? - ) - and ( # one of, + # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption + # for "graceful cancellation" case: + # + # Whenever a "side" of a context (a `trio.Task` running in + # an actor) **is** the side which requested ctx + # cancellation (likekly via ``Context.cancel()``), we + # **don't** want to re-raise any eventually received + # `ContextCancelled` response locally (would be akin to + # a `trio.Nursery` nursery raising `trio.Cancelled` + # whenever `CancelScope.cancel()` was called). + # + # Instead, silently reap the remote delivered ctxc + # (`ContextCancelled`) as an expected + # error-msg-is-cancellation-ack IFF said + # `remote_error: ContextCancelled` has `.canceller` + # set to the `Actor.uid` of THIS task (i.e. the + # cancellation requesting task's actor is the actor + # checking whether it should absorb the ctxc). + if ( + not raise_ctxc_from_self_call + and self._is_self_cancelled(remote_error) - (portal := self._portal) - and (our_uid := portal.actor.uid) - # TODO: ?potentially it is useful to emit certain - # warning/cancel logs for the cases where the - # cancellation is due to a lower level cancel - # request, such as `Portal.cancel_actor()`, since in - # that case it's not actually this specific ctx that - # made a `.cancel()` call, but it is the same - # actor-process? - and tuple(remote_error.canceller) == our_uid - or self.chan._cancel_called - or self.canceller == our_uid - ) - ) or ( + # TODO: ?potentially it is useful to emit certain + # warning/cancel logs for the cases where the + # cancellation is due to a lower level cancel + # request, such as `Portal.cancel_actor()`, since in + # that case it's not actually this specific ctx that + # made a `.cancel()` call, but it is the same + # actor-process? + # or self.chan._cancel_called + # XXX: ^ should we have a special separate case + # for this ^, NO right? - # NOTE: whenever this context is the cause of an - # overrun on the remote side (aka we sent msgs too - # fast that the remote task was overrun according - # to `MsgStream` buffer settings) AND the caller - # has requested to not raise overruns this side - # caused, we also silently absorb any remotely - # boxed `StreamOverrun`. This is mostly useful for - # supressing such faults during - # cancellation/error/final-result handling inside - # `_drain_to_final_msg()` such that we do not - # raise such errors particularly in the case where - # `._cancel_called == True`. - not raise_overrun_from_self - and isinstance(remote_error, RemoteActorError) - and remote_error.msgdata['type_str'] == 'StreamOverrun' - and tuple(remote_error.msgdata['sender']) == our_uid - ) + ) or ( + # NOTE: whenever this context is the cause of an + # overrun on the remote side (aka we sent msgs too + # fast that the remote task was overrun according + # to `MsgStream` buffer settings) AND the caller + # has requested to not raise overruns this side + # caused, we also silently absorb any remotely + # boxed `StreamOverrun`. This is mostly useful for + # supressing such faults during + # cancellation/error/final-result handling inside + # `_drain_to_final_msg()` such that we do not + # raise such errors particularly in the case where + # `._cancel_called == True`. + not raise_overrun_from_self + and isinstance(remote_error, RemoteActorError) + and remote_error.msgdata['type_str'] == 'StreamOverrun' + and tuple(remote_error.msgdata['sender']) == our_uid ): # NOTE: we set the local scope error to any "self # cancellation" error-response thus "absorbing" @@ -1236,7 +1303,7 @@ class Context: # TODO: change to `.wait_for_result()`? async def result( self, - hide_tb: bool = True, + hide_tb: bool = False, ) -> Any|Exception: ''' @@ -1378,7 +1445,20 @@ class Context: if error: return error - assert not self._cancel_msg + if cancmsg := self._cancel_msg: + # NOTE: means we're prolly in the process of + # processing the cancellation caused by + # this msg (eg. logging from `Actor._cancel_task()` + # method after receiving a `Context.cancel()` RPC) + # though there shouldn't ever be a `._cancel_msg` + # without it eventually resulting in this property + # delivering a value! + log.debug( + '`Context._cancel_msg` is set but has not yet resolved to `.maybe_error`?\n\n' + f'{cancmsg}\n' + ) + + # assert not self._cancel_msg return None def _final_result_is_set(self) -> bool: @@ -1411,6 +1491,7 @@ class Context: def repr_outcome( self, show_error_fields: bool = False, + type_only: bool = False, ) -> str: ''' @@ -1420,6 +1501,9 @@ class Context: ''' merr: Exception|None = self.maybe_error if merr: + if type_only: + return type(merr).__name__ + # if the error-type is one of ours and has the custom # defined "repr-(in)-one-line" method call it, ow # just deliver the type name. @@ -1616,8 +1700,6 @@ class Context: f'{pformat(msg)}\n' ) - # from .devx._debug import pause - # await pause() # NOTE: if an error is deteced we should always still # send it through the feeder-mem-chan and expect @@ -1666,7 +1748,7 @@ class Context: # overrun state and that msg isn't stuck in an # overflow queue what happens?!? - local_uid = current_actor().uid + local_uid = self._actor.uid txt: str = ( 'on IPC context:\n' @@ -1765,6 +1847,7 @@ def mk_context( ctx = Context( chan=chan, cid=cid, + _actor=current_actor(), _send_chan=send_chan, _recv_chan=recv_chan, _nsf=nsf,