forked from goodboy/tractor
				
			(Event) more pedantic `.cancel_acked: bool` def
Changes the condition logic to be more strict and moves it to a private `._is_self_cancelled() -> bool` predicate which can be used elsewhere (instead of having almost similar duplicate checks all over the place..) and allows taking in a specific `remote_error` just for verification purposes (like for tests). Main strictness distinctions are now: - obvi that `.cancel_called` is set (this filters any `Portal.cancel_actor()` or other out-of-band RPC), - the received `ContextCancelled` **must** have its `.canceller` set to this side's `Actor.uid` (indicating we are the requester). - `.src_actor_uid` **must** be the same as the `.chan.uid` (so the error must have originated from the opposite side's task. - `ContextCancelled.canceller` should be already set to the `.chan.uid` indicating we received the msg via the runtime calling `._deliver_msg()` -> `_maybe_cancel_and_set_remote_error()` which ensures the error is specifically destined for this ctx-task exactly the same as how `Actor._cancel_task()` sets it from an input `requesting_uid` arg. In support of the above adjust some impl deats: - add `Context._actor: Actor` which is set once in `mk_context()` to avoid issues (particularly in testing) where `current_actor()` raises after the root actor / runtime is already exited. Use `._actor.uid` in both `.cancel_acked` (obvi) and '_maybe_cancel_and_set_remote_error()` when deciding whether to call `._scope.cancel()`. - always cast `.canceller` to `tuple` if not null. - delegate `.cancel_acked` directly to new private predicate (obvi). - always set `._canceller` from any `RemoteActorError.src_actor_uid` or failing over to the `.chan.uid` when a non-remote error (tho that shouldn't ever happen right?). - more extensive doc-string for `.cancel()` detailing the new strictness rules about whether an eventual `.cancel_acked` might be set. Also tossed in even more logging format tweaks by adding a `type_only: bool` to `.repr_outcome()` as desired for simpler output in the `state: <outcome-repr-here>` and `.repr_rpc()` sections of the `.__str__()`.remotes/1757153874605917753/main
							parent
							
								
									c5228e7be5
								
							
						
					
					
						commit
						7b1528abed
					
				|  | @ -364,6 +364,9 @@ class Context: | |||
|     ''' | ||||
|     chan: Channel | ||||
|     cid: str  # "context id", more or less a unique linked-task-pair id | ||||
| 
 | ||||
|     _actor: Actor | ||||
| 
 | ||||
|     # the "feeder" channels for delivering message values to the | ||||
|     # local task from the runtime's msg processing loop. | ||||
|     _recv_chan: trio.MemoryReceiveChannel | ||||
|  | @ -429,7 +432,7 @@ class Context: | |||
|     # there's always going to be an "underlying reason" that any | ||||
|     # context was closed due to either a remote side error or | ||||
|     # a call to `.cancel()` which triggers `ContextCancelled`. | ||||
|     _cancel_msg: str | dict | None = None | ||||
|     _cancel_msg: str|dict|None = None | ||||
| 
 | ||||
|     # NOTE: this state var used by the runtime to determine if the | ||||
|     # `pdbp` REPL is allowed to engage on contexts terminated via | ||||
|  | @ -486,6 +489,13 @@ class Context: | |||
|                 f'   {stream}\n' | ||||
|             ) | ||||
| 
 | ||||
|         outcome_str: str = self.repr_outcome( | ||||
|             show_error_fields=True | ||||
|         ) | ||||
|         outcome_typ_str: str = self.repr_outcome( | ||||
|             type_only=True | ||||
|         ) | ||||
| 
 | ||||
|         return ( | ||||
|             f'<Context(\n' | ||||
|             # f'\n' | ||||
|  | @ -505,8 +515,16 @@ class Context: | |||
|             # f'   ---\n' | ||||
|             f'\n' | ||||
|             # f'   -----\n' | ||||
|             f' |_state: {self.repr_outcome()}\n' | ||||
|             f'   outcome{ds}{self.repr_outcome(show_error_fields=True)}\n' | ||||
|             # | ||||
|             # TODO: better state `str`ids? | ||||
|             # -[ ] maybe map err-types to strs like 'cancelled', | ||||
|             #     'errored', 'streaming', 'started', .. etc. | ||||
|             # -[ ] as well as a final result wrapper like | ||||
|             #     `outcome.Value`? | ||||
|             # | ||||
|             f' |_state: {outcome_typ_str}\n' | ||||
| 
 | ||||
|             f'   outcome{ds}{outcome_str}\n' | ||||
|             f'   result{ds}{self._result}\n' | ||||
|             f'   cancel_called{ds}{self.cancel_called}\n' | ||||
|             f'   cancel_acked{ds}{self.cancel_acked}\n' | ||||
|  | @ -545,14 +563,46 @@ class Context: | |||
|         return self._cancel_called | ||||
| 
 | ||||
|     @property | ||||
|     def canceller(self) -> tuple[str, str] | None: | ||||
|     def canceller(self) -> tuple[str, str]|None: | ||||
|         ''' | ||||
|         ``Actor.uid: tuple[str, str]`` of the (remote) | ||||
|         actor-process who's task was cancelled thus causing this | ||||
|         (side of the) context to also be cancelled. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._canceller | ||||
|         if canc := self._canceller: | ||||
|             return tuple(canc) | ||||
| 
 | ||||
|         return None | ||||
| 
 | ||||
|     def _is_self_cancelled( | ||||
|         self, | ||||
|         remote_error: Exception|None = None, | ||||
| 
 | ||||
|     ) -> bool: | ||||
| 
 | ||||
|         if not self._cancel_called: | ||||
|             return False | ||||
| 
 | ||||
|         re: BaseException|None = ( | ||||
|             remote_error | ||||
|             or self._remote_error | ||||
|         ) | ||||
|         if not re: | ||||
|             return False | ||||
| 
 | ||||
|         if from_uid := re.src_actor_uid: | ||||
|             from_uid: tuple = tuple(from_uid) | ||||
| 
 | ||||
|         our_uid: tuple = self._actor.uid | ||||
|         our_canceller = self.canceller | ||||
| 
 | ||||
|         return bool( | ||||
|             isinstance(re, ContextCancelled) | ||||
|             and from_uid == self.chan.uid | ||||
|             and re.canceller == our_uid | ||||
|             and our_canceller == from_uid | ||||
|         ) | ||||
| 
 | ||||
|     @property | ||||
|     def cancel_acked(self) -> bool: | ||||
|  | @ -568,22 +618,7 @@ class Context: | |||
|         equal to the uid of the calling task's actor. | ||||
| 
 | ||||
|         ''' | ||||
|         portal: Portal|None = self._portal | ||||
|         if portal: | ||||
|             our_uid: tuple = portal.actor.uid | ||||
| 
 | ||||
|         return bool( | ||||
|             self._cancel_called | ||||
|             and (re := self._remote_error) | ||||
|             and isinstance(re, ContextCancelled) | ||||
|             and ( | ||||
|                 re.canceller | ||||
|                 == | ||||
|                 self.canceller | ||||
|                 == | ||||
|                 our_uid | ||||
|            ) | ||||
|         ) | ||||
|         return self._is_self_cancelled() | ||||
| 
 | ||||
|     @property | ||||
|     def cancelled_caught(self) -> bool: | ||||
|  | @ -762,30 +797,15 @@ class Context: | |||
|         # self-cancel (ack) or, | ||||
|         # peer propagated remote cancellation. | ||||
|         if isinstance(error, ContextCancelled): | ||||
|             ctxc_src: tuple = error.canceller | ||||
| 
 | ||||
|             whom: str = ( | ||||
|                 'us' if ctxc_src == current_actor().uid | ||||
|                 'us' if error.canceller == self._actor.uid | ||||
|                 else 'peer' | ||||
|             ) | ||||
|             log.cancel( | ||||
|                 f'IPC context cancelled by {whom}!\n\n' | ||||
|                 f'{error}' | ||||
|             ) | ||||
|             # always record the cancelling actor's uid since its | ||||
|             # cancellation state is linked and we want to know | ||||
|             # which process was the cause / requester of the | ||||
|             # cancellation. | ||||
|             self._canceller = ctxc_src | ||||
| 
 | ||||
| 
 | ||||
|             if self._cancel_called: | ||||
|                 # this is an expected cancel request response | ||||
|                 # message and we **don't need to raise it** in the | ||||
|                 # local cancel `._scope` since it will potentially | ||||
|                 # override a real error. After this returns | ||||
|                 # `.cancel_acked == True`. | ||||
|                 return | ||||
| 
 | ||||
|         else: | ||||
|             log.error( | ||||
|  | @ -794,7 +814,23 @@ class Context: | |||
|                 f'{error}\n' | ||||
|                 f'{pformat(self)}\n' | ||||
|             ) | ||||
|             self._canceller = self.chan.uid | ||||
| 
 | ||||
|         # always record the cancelling actor's uid since its | ||||
|         # cancellation state is linked and we want to know | ||||
|         # which process was the cause / requester of the | ||||
|         # cancellation. | ||||
|         maybe_error_src: tuple = getattr( | ||||
|             error, | ||||
|             'src_actor_uid', | ||||
|             None, | ||||
|         ) | ||||
|         self._canceller = ( | ||||
|             maybe_error_src | ||||
|             or | ||||
|             # XXX: in the case we get a non-boxed error? | ||||
|             # -> wait but this should never happen right? | ||||
|             self.chan.uid | ||||
|         ) | ||||
| 
 | ||||
|         # Cancel the local `._scope`, catch that | ||||
|         # `._scope.cancelled_caught` and re-raise any remote error | ||||
|  | @ -803,6 +839,15 @@ class Context: | |||
|         cs: trio.CancelScope = self._scope | ||||
|         if ( | ||||
|             cs | ||||
| 
 | ||||
|             # XXX this is an expected cancel request response | ||||
|             # message and we **don't need to raise it** in the | ||||
|             # local cancel `._scope` since it will potentially | ||||
|             # override a real error. After this method returns | ||||
|             # if `._cancel_called` then `.cancel_acked and .cancel_called` | ||||
|             # always should be set. | ||||
|             and not self._is_self_cancelled() | ||||
| 
 | ||||
|             and not cs.cancel_called | ||||
|             and not cs.cancelled_caught | ||||
|         ): | ||||
|  | @ -840,9 +885,13 @@ class Context: | |||
|     ) -> str: | ||||
|         # TODO: how to show the transport interchange fmt? | ||||
|         # codec: str = self.chan.transport.codec_key | ||||
|         outcome_str: str = self.repr_outcome( | ||||
|             show_error_fields=True, | ||||
|             type_only=True, | ||||
|         ) | ||||
|         return ( | ||||
|             # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' | ||||
|             f'{self._nsf}() -> {self.repr_outcome()}:' | ||||
|             f'{self._nsf}() -> {outcome_str}:' | ||||
|         ) | ||||
| 
 | ||||
|     async def cancel( | ||||
|  | @ -851,10 +900,32 @@ class Context: | |||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Cancel this inter-actor-task context. | ||||
|         Cancel this inter-actor IPC context by requestng the | ||||
|         remote side's cancel-scope-linked `trio.Task` by calling | ||||
|         `._scope.cancel()` and delivering an `ContextCancelled` | ||||
|         ack msg in reponse. | ||||
| 
 | ||||
|         Request that the far side cancel it's current linked context, | ||||
|         Timeout quickly in an attempt to sidestep 2-generals... | ||||
|         Behaviour: | ||||
|         --------- | ||||
|         - after the far end cancels, the `.cancel()` calling side | ||||
|           should receive a `ContextCancelled` with the | ||||
|           `.canceller: tuple` uid set to the current `Actor.uid`. | ||||
| 
 | ||||
|         - timeout (quickly) on failure to rx this ACK error-msg in | ||||
|           an attempt to sidestep 2-generals when the transport | ||||
|           layer fails. | ||||
| 
 | ||||
|         Note, that calling this method DOES NOT also necessarily | ||||
|         result in `Context._scope.cancel()` being called | ||||
|         **locally**! | ||||
| 
 | ||||
|         => That is, an IPC `Context` (this) **does not** | ||||
|            have the same semantics as a `trio.CancelScope`. | ||||
| 
 | ||||
|         If the caller (who entered the `Portal.open_context()`) | ||||
|         desires that the internal block's cancel-scope  be | ||||
|         cancelled it should open its own `trio.CancelScope` and | ||||
|         manage it as needed. | ||||
| 
 | ||||
|         ''' | ||||
|         side: str = self.side | ||||
|  | @ -976,7 +1047,7 @@ class Context: | |||
|             ``trio``'s cancellation system. | ||||
| 
 | ||||
|         ''' | ||||
|         actor: Actor = current_actor() | ||||
|         actor: Actor = self._actor | ||||
| 
 | ||||
|         # If the surrounding context has been cancelled by some | ||||
|         # task with a handle to THIS, we error here immediately | ||||
|  | @ -1149,62 +1220,58 @@ class Context: | |||
|         a  cancellation (if any). | ||||
| 
 | ||||
|         ''' | ||||
|         if (( | ||||
|             # NOTE: whenever the context's "opener" side (task) **is** | ||||
|             # the side which requested the cancellation (likekly via | ||||
|             # ``Context.cancel()``), we don't want to re-raise that | ||||
|             # cancellation signal locally (would be akin to | ||||
|             # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` | ||||
|             # whenever  ``CancelScope.cancel()`` was called) and | ||||
|             # instead silently reap the expected cancellation | ||||
|             # "error"-msg-as-ack. In this case the `err: | ||||
|             # ContextCancelled` must have a `.canceller` set to the  | ||||
|             # uid of the requesting task's actor and we only do NOT | ||||
|             # raise that error locally if WE ARE THAT ACTOR which | ||||
|             # requested the cancellation. | ||||
|                 not raise_ctxc_from_self_call | ||||
|                 and isinstance(remote_error, ContextCancelled) | ||||
|                 and ( | ||||
|                     self._cancel_called | ||||
|         our_uid: tuple = self.chan.uid | ||||
| 
 | ||||
|                     # or self.chan._cancel_called | ||||
|                     # TODO: ^ should we have a special separate case | ||||
|                     # for this ^ ? | ||||
|                 ) | ||||
|                 and ( # one of, | ||||
|         # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption | ||||
|         # for "graceful cancellation" case: | ||||
|         # | ||||
|         # Whenever a "side" of a context (a `trio.Task` running in | ||||
|         # an actor) **is** the side which requested ctx | ||||
|         # cancellation (likekly via ``Context.cancel()``), we | ||||
|         # **don't** want to re-raise any eventually received | ||||
|         # `ContextCancelled` response locally (would be akin to | ||||
|         # a `trio.Nursery` nursery raising `trio.Cancelled` | ||||
|         # whenever `CancelScope.cancel()` was called). | ||||
|         # | ||||
|         # Instead, silently reap the remote delivered ctxc | ||||
|         # (`ContextCancelled`) as an expected | ||||
|         # error-msg-is-cancellation-ack IFF said | ||||
|         # `remote_error: ContextCancelled` has `.canceller` | ||||
|         # set to the `Actor.uid` of THIS task (i.e. the | ||||
|         # cancellation requesting task's actor is the actor | ||||
|         # checking whether it should absorb the ctxc). | ||||
|         if ( | ||||
|             not raise_ctxc_from_self_call | ||||
|             and self._is_self_cancelled(remote_error) | ||||
| 
 | ||||
|                     (portal := self._portal) | ||||
|                     and (our_uid := portal.actor.uid) | ||||
|                     # TODO: ?potentially it is useful to emit certain | ||||
|                     # warning/cancel logs for the cases where the | ||||
|                     # cancellation is due to a lower level cancel | ||||
|                     # request, such as `Portal.cancel_actor()`, since in | ||||
|                     # that case it's not actually this specific ctx that | ||||
|                     # made a `.cancel()` call, but it is the same | ||||
|                     # actor-process? | ||||
|                     and tuple(remote_error.canceller) == our_uid | ||||
|                     or self.chan._cancel_called | ||||
|                     or self.canceller == our_uid | ||||
|                  ) | ||||
|             ) or ( | ||||
|             # TODO: ?potentially it is useful to emit certain | ||||
|             # warning/cancel logs for the cases where the | ||||
|             # cancellation is due to a lower level cancel | ||||
|             # request, such as `Portal.cancel_actor()`, since in | ||||
|             # that case it's not actually this specific ctx that | ||||
|             # made a `.cancel()` call, but it is the same | ||||
|             # actor-process? | ||||
|             # or self.chan._cancel_called | ||||
|             # XXX: ^ should we have a special separate case | ||||
|             # for this ^, NO right? | ||||
| 
 | ||||
|                 # NOTE: whenever this context is the cause of an | ||||
|                 # overrun on the remote side (aka we sent msgs too | ||||
|                 # fast that the remote task was overrun according | ||||
|                 # to `MsgStream` buffer settings) AND the caller | ||||
|                 # has requested to not raise overruns this side | ||||
|                 # caused, we also silently absorb any remotely | ||||
|                 # boxed `StreamOverrun`. This is mostly useful for | ||||
|                 # supressing such faults during | ||||
|                 # cancellation/error/final-result handling inside | ||||
|                 # `_drain_to_final_msg()` such that we do not | ||||
|                 # raise such errors particularly in the case where | ||||
|                 # `._cancel_called == True`. | ||||
|                 not raise_overrun_from_self | ||||
|                 and isinstance(remote_error, RemoteActorError) | ||||
|                 and remote_error.msgdata['type_str'] == 'StreamOverrun' | ||||
|                 and tuple(remote_error.msgdata['sender']) == our_uid | ||||
|             ) | ||||
|         ) or ( | ||||
|             # NOTE: whenever this context is the cause of an | ||||
|             # overrun on the remote side (aka we sent msgs too | ||||
|             # fast that the remote task was overrun according | ||||
|             # to `MsgStream` buffer settings) AND the caller | ||||
|             # has requested to not raise overruns this side | ||||
|             # caused, we also silently absorb any remotely | ||||
|             # boxed `StreamOverrun`. This is mostly useful for | ||||
|             # supressing such faults during | ||||
|             # cancellation/error/final-result handling inside | ||||
|             # `_drain_to_final_msg()` such that we do not | ||||
|             # raise such errors particularly in the case where | ||||
|             # `._cancel_called == True`. | ||||
|             not raise_overrun_from_self | ||||
|             and isinstance(remote_error, RemoteActorError) | ||||
|             and remote_error.msgdata['type_str'] == 'StreamOverrun' | ||||
|             and tuple(remote_error.msgdata['sender']) == our_uid | ||||
|         ): | ||||
|             # NOTE: we set the local scope error to any "self | ||||
|             # cancellation" error-response thus "absorbing" | ||||
|  | @ -1236,7 +1303,7 @@ class Context: | |||
|     # TODO: change  to `.wait_for_result()`? | ||||
|     async def result( | ||||
|         self, | ||||
|         hide_tb: bool = True, | ||||
|         hide_tb: bool = False, | ||||
| 
 | ||||
|     ) -> Any|Exception: | ||||
|         ''' | ||||
|  | @ -1378,7 +1445,20 @@ class Context: | |||
|         if error: | ||||
|             return error | ||||
| 
 | ||||
|         assert not self._cancel_msg | ||||
|         if cancmsg := self._cancel_msg: | ||||
|             # NOTE: means we're prolly in the process of | ||||
|             # processing the cancellation caused by | ||||
|             # this msg (eg. logging from `Actor._cancel_task()` | ||||
|             # method after receiving a `Context.cancel()` RPC) | ||||
|             # though there shouldn't ever be a `._cancel_msg` | ||||
|             # without it eventually resulting in this property | ||||
|             # delivering a value! | ||||
|             log.debug( | ||||
|                 '`Context._cancel_msg` is set but has not yet resolved to `.maybe_error`?\n\n' | ||||
|                 f'{cancmsg}\n' | ||||
|             ) | ||||
| 
 | ||||
|         # assert not self._cancel_msg | ||||
|         return None | ||||
| 
 | ||||
|     def _final_result_is_set(self) -> bool: | ||||
|  | @ -1411,6 +1491,7 @@ class Context: | |||
|     def repr_outcome( | ||||
|         self, | ||||
|         show_error_fields: bool = False, | ||||
|         type_only: bool = False, | ||||
| 
 | ||||
|     ) -> str: | ||||
|         ''' | ||||
|  | @ -1420,6 +1501,9 @@ class Context: | |||
|         ''' | ||||
|         merr: Exception|None = self.maybe_error | ||||
|         if merr: | ||||
|             if type_only: | ||||
|                 return type(merr).__name__ | ||||
| 
 | ||||
|             # if the error-type is one of ours and has the custom | ||||
|             # defined "repr-(in)-one-line" method call it, ow | ||||
|             # just deliver the type name. | ||||
|  | @ -1616,8 +1700,6 @@ class Context: | |||
| 
 | ||||
|                 f'{pformat(msg)}\n' | ||||
|             ) | ||||
|             # from .devx._debug import pause | ||||
|             # await pause() | ||||
| 
 | ||||
|             # NOTE: if an error is deteced we should always still | ||||
|             # send it through the feeder-mem-chan and expect | ||||
|  | @ -1666,7 +1748,7 @@ class Context: | |||
|             # overrun state and that msg isn't stuck in an | ||||
|             # overflow queue what happens?!? | ||||
| 
 | ||||
|             local_uid = current_actor().uid | ||||
|             local_uid = self._actor.uid | ||||
|             txt: str = ( | ||||
|                 'on IPC context:\n' | ||||
| 
 | ||||
|  | @ -1765,6 +1847,7 @@ def mk_context( | |||
|     ctx = Context( | ||||
|         chan=chan, | ||||
|         cid=cid, | ||||
|         _actor=current_actor(), | ||||
|         _send_chan=send_chan, | ||||
|         _recv_chan=recv_chan, | ||||
|         _nsf=nsf, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue