Be mega-pedantic with `ContextCancelled` semantics
As part of extremely detailed inter-peer-actor testing, add much more granular `Context` cancellation state tracking via the following (new) fields: - `.canceller: tuple[str, str]` the uuid of the actor responsible for the cancellation condition - always set by `Context._maybe_cancel_and_set_remote_error()` and replaces `._cancelled_remote` and `.cancel_called_remote`. If set, this value should normally always match a value from some `ContextCancelled` raised or caught by one side of the context. - `._local_error` which is always set to the locally raised (and caller or callee task's scope-internal) error which caused any eventual cancellation/error condition and thus any closure of the context's per-task-side-`trio.Nursery`. - `.cancelled_caught: bool` is now always `True` whenever the local task catches (or "silently absorbs") a `ContextCancelled` (a `ctxc`) that indeed originated from one of the context's linked tasks or any other context which raised its own `ctxc` in the current `.open_context()` scope. => whenever there is a case that no `ContextCancelled` was raised **in** the `.open_context().__aexit__()` (eg. `ctx.result()` called after a call `ctx.cancel()`), we still consider the context's as having "caught a cancellation" since the `ctxc` was indeed silently handled by the cancel requester; all other error cases are already represented by mirroring the state of the `._scope: trio.CancelScope` => IOW there should be **no case** where an error is **not raised** in the context's scope and `.cancelled_caught: bool == False`, i.e. no case where `._scope.cancelled_caught == False and ._local_error is not None`! - always raise any `ctxc` from `.open_stream()` if `._cancel_called == True` - if the cancellation request has not already resulted in a `._remote_error: ContextCancelled` we raise a `RuntimeError` to indicate improper usage to the guilty side's task code. - make `._maybe_raise_remote_err()` a sync func and don't raise any `ctxc` which is matched against a `.canceller` determined to be the current actor, aka a "self cancel", and always set the `._local_error` to any such `ctxc`. - `.side: str` taken from inside `.cancel()` and unused as of now since it might be better re-written as a similar `.is_opener() -> bool`? - drop unused `._started_received: bool`.. - TONS and TONS of detailed comments/docs to attempt to explain all the possible cancellation/exit cases and how they should exhibit as either silent closes or raises from the `Context` API! Adjust the `._runtime._invoke()` code to match: - use `ctx._maybe_raise_remote_err()` in `._invoke()`. - adjust to new `.canceller` property. - more type hints. - better `log.cancel()` msging around self-cancels vs. peer-cancels. - always set the `._local_error: BaseException` for the "callee" task just like `Portal.open_context()` now will do B) Prior we were raising any `Context._remote_error` directly and doing (more or less) the same `ContextCancelled` "absorbing" logic (well kinda) in block; instead delegate to the methodshielded_ctx_cancel
							parent
							
								
									5a94e8fb5b
								
							
						
					
					
						commit
						131674eabd
					
				| 
						 | 
				
			
			@ -56,6 +56,7 @@ from ._state import current_actor
 | 
			
		|||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ._portal import Portal
 | 
			
		||||
    from ._runtime import Actor
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -64,20 +65,26 @@ log = get_logger(__name__)
 | 
			
		|||
@dataclass
 | 
			
		||||
class Context:
 | 
			
		||||
    '''
 | 
			
		||||
    An inter-actor, ``trio``-task communication context.
 | 
			
		||||
    An inter-actor, SC transitive, `trio.Task` communication context.
 | 
			
		||||
 | 
			
		||||
    NB: This class should never be instatiated directly, it is delivered
 | 
			
		||||
    by either,
 | 
			
		||||
     - runtime machinery to a remotely started task or,
 | 
			
		||||
     - by entering ``Portal.open_context()``.
 | 
			
		||||
    NB: This class should **never be instatiated directly**, it is allocated
 | 
			
		||||
    by the runtime in 2 ways:
 | 
			
		||||
     - by entering ``Portal.open_context()`` which is the primary
 | 
			
		||||
       public API for any "caller" task or,
 | 
			
		||||
     - by the RPC machinery's `._runtime._invoke()` as a `ctx` arg
 | 
			
		||||
       to a remotely scheduled "callee" function.
 | 
			
		||||
 | 
			
		||||
     and is always constructed using ``mkt_context()``.
 | 
			
		||||
    AND is always constructed using the below ``mk_context()``.
 | 
			
		||||
 | 
			
		||||
    Allows maintaining task or protocol specific state between
 | 
			
		||||
    2 communicating, parallel executing actor tasks. A unique context is
 | 
			
		||||
    allocated on each side of any task RPC-linked msg dialog, for
 | 
			
		||||
    every request to a remote actor from a portal. On the "callee"
 | 
			
		||||
    side a context is always allocated inside ``._runtime._invoke()``.
 | 
			
		||||
    2 cancel-scope-linked, communicating and parallel executing
 | 
			
		||||
    `trio.Task`s. Contexts are allocated on each side of any task
 | 
			
		||||
    RPC-linked msg dialog, i.e. for every request to a remote
 | 
			
		||||
    actor from a `Portal`. On the "callee" side a context is
 | 
			
		||||
    always allocated inside ``._runtime._invoke()``.
 | 
			
		||||
 | 
			
		||||
    # TODO: more detailed writeup on cancellation, error and
 | 
			
		||||
    # streaming semantics..
 | 
			
		||||
 | 
			
		||||
    A context can be cancelled and (possibly eventually restarted) from
 | 
			
		||||
    either side of the underlying IPC channel, it can also open task
 | 
			
		||||
| 
						 | 
				
			
			@ -108,12 +115,31 @@ class Context:
 | 
			
		|||
    # which is exactly the primitive that allows for
 | 
			
		||||
    # cross-actor-task-supervision and thus SC.
 | 
			
		||||
    _scope: trio.CancelScope | None = None
 | 
			
		||||
 | 
			
		||||
    # on a clean exit there should be a final value
 | 
			
		||||
    # delivered from the far end "callee" task, so
 | 
			
		||||
    # this value is only set on one side.
 | 
			
		||||
    _result: Any | int = None
 | 
			
		||||
 | 
			
		||||
    # if the local "caller"  task errors this
 | 
			
		||||
    # value is always set to the error that was
 | 
			
		||||
    # captured in the `Portal.open_context().__aexit__()`
 | 
			
		||||
    # teardown.
 | 
			
		||||
    _local_error: BaseException | None = None
 | 
			
		||||
 | 
			
		||||
    # if the either side gets an error from the other
 | 
			
		||||
    # this value is set to that error unpacked from an
 | 
			
		||||
    # IPC msg.
 | 
			
		||||
    _remote_error: BaseException | None = None
 | 
			
		||||
 | 
			
		||||
    # cancellation state
 | 
			
		||||
    # only set if the local task called `.cancel()`
 | 
			
		||||
    _cancel_called: bool = False  # did WE cancel the far end?
 | 
			
		||||
    _cancelled_remote: tuple[str, str] | None = None
 | 
			
		||||
 | 
			
		||||
    # TODO: do we even need this? we can assume that if we're
 | 
			
		||||
    # cancelled that the other side is as well, so maybe we should
 | 
			
		||||
    # instead just have a `.canceller` pulled from the
 | 
			
		||||
    # `ContextCancelled`?
 | 
			
		||||
    _canceller: tuple[str, str] | None = None
 | 
			
		||||
 | 
			
		||||
    # NOTE: we try to ensure assignment of a "cancel msg" since
 | 
			
		||||
    # there's always going to be an "underlying reason" that any
 | 
			
		||||
| 
						 | 
				
			
			@ -145,23 +171,47 @@ class Context:
 | 
			
		|||
        return self._cancel_called
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def cancel_called_remote(self) -> tuple[str, str] | None:
 | 
			
		||||
    def canceller(self) -> tuple[str, str] | None:
 | 
			
		||||
        '''
 | 
			
		||||
        ``Actor.uid`` of the remote actor who's task was cancelled
 | 
			
		||||
        causing this side of the context to also be cancelled.
 | 
			
		||||
        ``Actor.uid: tuple[str, str]`` of the (remote)
 | 
			
		||||
        actor-process who's task was cancelled thus causing this
 | 
			
		||||
        (side of the) context to also be cancelled.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        remote_uid = self._cancelled_remote
 | 
			
		||||
        if remote_uid:
 | 
			
		||||
            return tuple(remote_uid)
 | 
			
		||||
        return self._canceller
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def cancelled_caught(self) -> bool:
 | 
			
		||||
        return self._scope.cancelled_caught
 | 
			
		||||
        return (
 | 
			
		||||
            # the local scope was cancelled either by
 | 
			
		||||
            # remote error or self-request
 | 
			
		||||
            self._scope.cancelled_caught
 | 
			
		||||
 | 
			
		||||
            # the local scope was never cancelled
 | 
			
		||||
            # and instead likely we received a remote side
 | 
			
		||||
            # cancellation that was raised inside `.result()`
 | 
			
		||||
            or (
 | 
			
		||||
                (se := self._local_error)
 | 
			
		||||
                and
 | 
			
		||||
                isinstance(se, ContextCancelled)
 | 
			
		||||
                and (
 | 
			
		||||
                    se.canceller == self.canceller
 | 
			
		||||
                    or
 | 
			
		||||
                    se is self._remote_error
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def side(self) -> str:
 | 
			
		||||
        '''
 | 
			
		||||
        Return string indicating which task this instance is wrapping.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return 'caller' if self._portal else 'callee'
 | 
			
		||||
 | 
			
		||||
    # init and streaming state
 | 
			
		||||
    _started_called: bool = False
 | 
			
		||||
    _started_received: bool = False
 | 
			
		||||
    _stream_opened: bool = False
 | 
			
		||||
 | 
			
		||||
    # overrun handling machinery
 | 
			
		||||
| 
						 | 
				
			
			@ -196,7 +246,7 @@ class Context:
 | 
			
		|||
    async def send_stop(self) -> None:
 | 
			
		||||
        await self.chan.send({'stop': True, 'cid': self.cid})
 | 
			
		||||
 | 
			
		||||
    async def _maybe_cancel_and_set_remote_error(
 | 
			
		||||
    def _maybe_cancel_and_set_remote_error(
 | 
			
		||||
        self,
 | 
			
		||||
        error: BaseException,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -269,16 +319,19 @@ class Context:
 | 
			
		|||
        # that error as the reason.
 | 
			
		||||
        self._remote_error: BaseException = error
 | 
			
		||||
 | 
			
		||||
        # always record the remote actor's uid since its cancellation
 | 
			
		||||
        # state is directly linked to ours (the local one).
 | 
			
		||||
        self._cancelled_remote = self.chan.uid
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            isinstance(error, ContextCancelled)
 | 
			
		||||
        ):
 | 
			
		||||
            # always record the cancelling actor's uid since its cancellation
 | 
			
		||||
            # state is linked and we want to know which process was
 | 
			
		||||
            # the cause / requester of the cancellation.
 | 
			
		||||
            self._canceller = error.canceller
 | 
			
		||||
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'Remote task-context sucessfully cancelled for '
 | 
			
		||||
                f'{self.chan.uid}:{self.cid}'
 | 
			
		||||
                'Remote task-context was cancelled for '
 | 
			
		||||
                f'actor: {self.chan.uid}\n'
 | 
			
		||||
                f'task: {self.cid}\n'
 | 
			
		||||
                f'canceller: {error.canceller}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if self._cancel_called:
 | 
			
		||||
| 
						 | 
				
			
			@ -289,22 +342,37 @@ class Context:
 | 
			
		|||
                # and we **don't need to raise it** in local cancel
 | 
			
		||||
                # scope since it will potentially override a real error.
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            log.error(
 | 
			
		||||
                f'Remote context error for {self.chan.uid}:{self.cid}:\n'
 | 
			
		||||
                f'Remote context error,\n'
 | 
			
		||||
                f'remote actor: {self.chan.uid}\n'
 | 
			
		||||
                f'task: {self.cid}\n'
 | 
			
		||||
                f'{error}'
 | 
			
		||||
            )
 | 
			
		||||
            self._canceller = self.chan.uid
 | 
			
		||||
 | 
			
		||||
        # TODO: tempted to **not** do this by-reraising in a
 | 
			
		||||
        # nursery and instead cancel a surrounding scope, detect
 | 
			
		||||
        # the cancellation, then lookup the error that was set?
 | 
			
		||||
        # YES! this is way better and simpler!
 | 
			
		||||
        if self._scope:
 | 
			
		||||
        cs: trio.CancelScope = self._scope
 | 
			
		||||
        if (
 | 
			
		||||
            cs
 | 
			
		||||
            and not cs.cancel_called
 | 
			
		||||
            and not cs.cancelled_caught
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
            # TODO: we can for sure drop this right?
 | 
			
		||||
            # from trio.testing import wait_all_tasks_blocked
 | 
			
		||||
            # await wait_all_tasks_blocked()
 | 
			
		||||
            # self._cancelled_remote = self.chan.uid
 | 
			
		||||
 | 
			
		||||
            # TODO: it'd sure be handy to inject our own
 | 
			
		||||
            # `trio.Cancelled` subtype here ;)
 | 
			
		||||
            # https://github.com/goodboy/tractor/issues/368
 | 
			
		||||
            self._scope.cancel()
 | 
			
		||||
 | 
			
		||||
            # this REPL usage actually works here BD
 | 
			
		||||
            # NOTE: this REPL usage actually works here dawg! Bo
 | 
			
		||||
            # from .devx._debug import pause
 | 
			
		||||
            # await pause()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -320,13 +388,19 @@ class Context:
 | 
			
		|||
        Timeout quickly in an attempt to sidestep 2-generals...
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        side: str = 'caller' if self._portal else 'callee'
 | 
			
		||||
        side: str = self.side
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'Cancelling {side} side of context to {self.chan.uid}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._cancel_called: bool = True
 | 
			
		||||
 | 
			
		||||
        # caller side who entered `Portal.open_context()`
 | 
			
		||||
        # NOTE: on the call side we never manually call
 | 
			
		||||
        # `._scope.cancel()` since we expect the eventual
 | 
			
		||||
        # `ContextCancelled` from the other side to trigger this
 | 
			
		||||
        # when the runtime finally receives it during teardown
 | 
			
		||||
        # (normally in `.result()` called from
 | 
			
		||||
        # `Portal.open_context().__aexit__()`)
 | 
			
		||||
        if side == 'caller':
 | 
			
		||||
            if not self._portal:
 | 
			
		||||
                raise RuntimeError(
 | 
			
		||||
| 
						 | 
				
			
			@ -349,7 +423,6 @@ class Context:
 | 
			
		|||
                    '_cancel_task',
 | 
			
		||||
                    cid=cid,
 | 
			
		||||
                )
 | 
			
		||||
                # print("EXITING CANCEL CALL")
 | 
			
		||||
 | 
			
		||||
            if cs.cancelled_caught:
 | 
			
		||||
                # XXX: there's no way to know if the remote task was indeed
 | 
			
		||||
| 
						 | 
				
			
			@ -368,6 +441,9 @@ class Context:
 | 
			
		|||
                    )
 | 
			
		||||
 | 
			
		||||
        # callee side remote task
 | 
			
		||||
        # NOTE: on this side we ALWAYS cancel the local scope since
 | 
			
		||||
        # the caller expects a `ContextCancelled` to be sent from
 | 
			
		||||
        # `._runtime._invoke()` back to the other side.
 | 
			
		||||
        else:
 | 
			
		||||
            # TODO: should we have an explicit cancel message
 | 
			
		||||
            # or is relaying the local `trio.Cancelled` as an
 | 
			
		||||
| 
						 | 
				
			
			@ -403,7 +479,7 @@ class Context:
 | 
			
		|||
            ``trio``'s cancellation system.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        actor = current_actor()
 | 
			
		||||
        actor: Actor = current_actor()
 | 
			
		||||
 | 
			
		||||
        # here we create a mem chan that corresponds to the
 | 
			
		||||
        # far end caller / callee.
 | 
			
		||||
| 
						 | 
				
			
			@ -413,12 +489,34 @@ class Context:
 | 
			
		|||
        # killed
 | 
			
		||||
 | 
			
		||||
        if self._cancel_called:
 | 
			
		||||
            task = trio.lowlevel.current_task().name
 | 
			
		||||
            raise ContextCancelled(
 | 
			
		||||
                f'Context around {actor.uid[0]}:{task} was already cancelled!'
 | 
			
		||||
 | 
			
		||||
            # XXX NOTE: ALWAYS RAISE any remote error here even if
 | 
			
		||||
            # it's an expected `ContextCancelled` (after some local
 | 
			
		||||
            # task having called `.cancel()` !
 | 
			
		||||
            #
 | 
			
		||||
            # WHY: we expect the error to always bubble up to the
 | 
			
		||||
            # surrounding `Portal.open_context()` call and be
 | 
			
		||||
            # absorbed there (silently) and we DO NOT want to
 | 
			
		||||
            # actually try to stream - a cancel msg was already
 | 
			
		||||
            # sent to the other side!
 | 
			
		||||
            if re := self._remote_error:
 | 
			
		||||
                raise self._remote_error
 | 
			
		||||
 | 
			
		||||
            # XXX NOTE: if no `ContextCancelled` has been responded
 | 
			
		||||
            # back from the other side (yet), we raise a different
 | 
			
		||||
            # runtime error indicating that this task's usage of
 | 
			
		||||
            # `Context.cancel()` and then `.open_stream()` is WRONG!
 | 
			
		||||
            task: str = trio.lowlevel.current_task().name
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
                'Stream opened after `Context.cancel()` called..?\n'
 | 
			
		||||
                f'task: {actor.uid[0]}:{task}\n'
 | 
			
		||||
                f'{self}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if not self._portal and not self._started_called:
 | 
			
		||||
        if (
 | 
			
		||||
            not self._portal
 | 
			
		||||
            and not self._started_called
 | 
			
		||||
        ):
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
                'Context.started()` must be called before opening a stream'
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -434,7 +532,7 @@ class Context:
 | 
			
		|||
            msg_buffer_size=msg_buffer_size,
 | 
			
		||||
            allow_overruns=allow_overruns,
 | 
			
		||||
        )
 | 
			
		||||
        ctx._allow_overruns = allow_overruns
 | 
			
		||||
        ctx._allow_overruns: bool = allow_overruns
 | 
			
		||||
        assert ctx is self
 | 
			
		||||
 | 
			
		||||
        # XXX: If the underlying channel feeder receive mem chan has
 | 
			
		||||
| 
						 | 
				
			
			@ -444,27 +542,32 @@ class Context:
 | 
			
		|||
 | 
			
		||||
        if ctx._recv_chan._closed:
 | 
			
		||||
            raise trio.ClosedResourceError(
 | 
			
		||||
                'The underlying channel for this stream was already closed!?')
 | 
			
		||||
                'The underlying channel for this stream was already closed!?'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        async with MsgStream(
 | 
			
		||||
            ctx=self,
 | 
			
		||||
            rx_chan=ctx._recv_chan,
 | 
			
		||||
        ) as stream:
 | 
			
		||||
 | 
			
		||||
            # NOTE: we track all existing streams per portal for
 | 
			
		||||
            # the purposes of attempting graceful closes on runtime
 | 
			
		||||
            # cancel requests.
 | 
			
		||||
            if self._portal:
 | 
			
		||||
                self._portal._streams.add(stream)
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                self._stream_opened = True
 | 
			
		||||
                self._stream_opened: bool = True
 | 
			
		||||
 | 
			
		||||
                # XXX: do we need this?
 | 
			
		||||
                # ensure we aren't cancelled before yielding the stream
 | 
			
		||||
                # await trio.lowlevel.checkpoint()
 | 
			
		||||
                yield stream
 | 
			
		||||
 | 
			
		||||
                # NOTE: Make the stream "one-shot use".  On exit, signal
 | 
			
		||||
                # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
 | 
			
		||||
                # far end.
 | 
			
		||||
                # NOTE: Make the stream "one-shot use".  On exit,
 | 
			
		||||
                # signal
 | 
			
		||||
                # ``trio.EndOfChannel``/``StopAsyncIteration`` to
 | 
			
		||||
                # the far end.
 | 
			
		||||
                await stream.aclose()
 | 
			
		||||
 | 
			
		||||
            finally:
 | 
			
		||||
| 
						 | 
				
			
			@ -495,14 +598,22 @@ class Context:
 | 
			
		|||
        # whenever  ``CancelScope.cancel()`` was called) and
 | 
			
		||||
        # instead silently reap the expected cancellation
 | 
			
		||||
        # "error"-msg.
 | 
			
		||||
        our_uid: tuple[str, str] = current_actor().uid
 | 
			
		||||
        if (
 | 
			
		||||
            isinstance(err, ContextCancelled)
 | 
			
		||||
            and (
 | 
			
		||||
                self._cancel_called
 | 
			
		||||
                or self.chan._cancel_called
 | 
			
		||||
                or tuple(err.canceller) == current_actor().uid
 | 
			
		||||
                or self.canceller == our_uid
 | 
			
		||||
                or tuple(err.canceller) == our_uid
 | 
			
		||||
            )
 | 
			
		||||
        ):
 | 
			
		||||
            # NOTE: we set the local scope error to any "self
 | 
			
		||||
            # cancellation" error-response thus "absorbing"
 | 
			
		||||
            # the error silently B)
 | 
			
		||||
            if self._local_error is None:
 | 
			
		||||
                self._local_error = err
 | 
			
		||||
 | 
			
		||||
            return err
 | 
			
		||||
 | 
			
		||||
        # NOTE: currently we are masking underlying runtime errors
 | 
			
		||||
| 
						 | 
				
			
			@ -515,7 +626,7 @@ class Context:
 | 
			
		|||
        #       runtime frames from the tb explicitly?
 | 
			
		||||
        # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
 | 
			
		||||
        # https://stackoverflow.com/a/24752607
 | 
			
		||||
        __tracebackhide__: bool = True
 | 
			
		||||
        # __tracebackhide__: bool = True
 | 
			
		||||
        raise err from None
 | 
			
		||||
 | 
			
		||||
    async def result(self) -> Any | Exception:
 | 
			
		||||
| 
						 | 
				
			
			@ -544,7 +655,6 @@ class Context:
 | 
			
		|||
        of the remote cancellation.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        __tracebackhide__: bool = True
 | 
			
		||||
        assert self._portal, "Context.result() can not be called from callee!"
 | 
			
		||||
        assert self._recv_chan
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -607,13 +717,15 @@ class Context:
 | 
			
		|||
                        "Received internal error at portal?"
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                    err = unpack_error(
 | 
			
		||||
                    if err:= unpack_error(
 | 
			
		||||
                        msg,
 | 
			
		||||
                        self._portal.channel
 | 
			
		||||
                    )  # from msgerr
 | 
			
		||||
                    ):  # from msgerr
 | 
			
		||||
                        self._maybe_cancel_and_set_remote_error(err)
 | 
			
		||||
                        self._maybe_raise_remote_err(err)
 | 
			
		||||
 | 
			
		||||
                    err = self._maybe_raise_remote_err(err)
 | 
			
		||||
                    self._remote_error = err
 | 
			
		||||
                    else:
 | 
			
		||||
                        raise
 | 
			
		||||
 | 
			
		||||
        if re := self._remote_error:
 | 
			
		||||
            return self._maybe_raise_remote_err(re)
 | 
			
		||||
| 
						 | 
				
			
			@ -724,13 +836,17 @@ class Context:
 | 
			
		|||
            f"Delivering {msg} from {uid} to caller {cid}"
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        error = msg.get('error')
 | 
			
		||||
        if error := unpack_error(
 | 
			
		||||
        if (
 | 
			
		||||
            msg.get('error')  # check for field
 | 
			
		||||
            and (
 | 
			
		||||
                error := unpack_error(
 | 
			
		||||
                    msg,
 | 
			
		||||
                    self.chan,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
        ):
 | 
			
		||||
            self._cancel_msg = msg
 | 
			
		||||
            await self._maybe_cancel_and_set_remote_error(error)
 | 
			
		||||
            self._maybe_cancel_and_set_remote_error(error)
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            self._in_overrun
 | 
			
		||||
| 
						 | 
				
			
			@ -765,7 +881,7 @@ class Context:
 | 
			
		|||
 | 
			
		||||
            # XXX: always push an error even if the local
 | 
			
		||||
            # receiver is in overrun state.
 | 
			
		||||
            # await self._maybe_cancel_and_set_remote_error(msg)
 | 
			
		||||
            # self._maybe_cancel_and_set_remote_error(msg)
 | 
			
		||||
 | 
			
		||||
            local_uid = current_actor().uid
 | 
			
		||||
            lines = [
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -86,12 +86,14 @@ async def _invoke(
 | 
			
		|||
    ] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Invoke local func and deliver result(s) over provided channel.
 | 
			
		||||
    Schedule a `trio` task-as-func and deliver result(s) over
 | 
			
		||||
    connected IPC channel.
 | 
			
		||||
 | 
			
		||||
    This is the core "RPC task" starting machinery.
 | 
			
		||||
    This is the core "RPC" `trio.Task` scheduling machinery used to start every
 | 
			
		||||
    remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    __tracebackhide__ = True
 | 
			
		||||
    __tracebackhide__: bool = True
 | 
			
		||||
    treat_as_gen: bool = False
 | 
			
		||||
    failed_resp: bool = False
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -209,6 +211,8 @@ async def _invoke(
 | 
			
		|||
                # far end async gen to tear down
 | 
			
		||||
                await chan.send({'stop': True, 'cid': cid})
 | 
			
		||||
 | 
			
		||||
        # TODO: every other "func type" should be implemented from
 | 
			
		||||
        # a special case of a context eventually!
 | 
			
		||||
        elif context:
 | 
			
		||||
            # context func with support for bi-dir streaming
 | 
			
		||||
            await chan.send({'functype': 'context', 'cid': cid})
 | 
			
		||||
| 
						 | 
				
			
			@ -219,21 +223,30 @@ async def _invoke(
 | 
			
		|||
                    ctx._scope = nurse.cancel_scope
 | 
			
		||||
                    task_status.started(ctx)
 | 
			
		||||
                    res = await coro
 | 
			
		||||
                    await chan.send({'return': res, 'cid': cid})
 | 
			
		||||
                    await chan.send({
 | 
			
		||||
                        'return': res,
 | 
			
		||||
                        'cid': cid
 | 
			
		||||
                    })
 | 
			
		||||
 | 
			
		||||
            # XXX: do we ever trigger this block any more?
 | 
			
		||||
            except (
 | 
			
		||||
                BaseExceptionGroup,
 | 
			
		||||
                trio.Cancelled,
 | 
			
		||||
            ):
 | 
			
		||||
                # if a context error was set then likely
 | 
			
		||||
                # thei multierror was raised due to that
 | 
			
		||||
                if ctx._remote_error is not None:
 | 
			
		||||
                    raise ctx._remote_error
 | 
			
		||||
            ) as scope_error:
 | 
			
		||||
 | 
			
		||||
                # maybe TODO: pack in ``trio.Cancelled.__traceback__`` here
 | 
			
		||||
                # so they can be unwrapped and displayed on the caller
 | 
			
		||||
                # side?
 | 
			
		||||
                # always set this (callee) side's exception as the
 | 
			
		||||
                # local error on the context
 | 
			
		||||
                ctx._local_error: BaseException = scope_error
 | 
			
		||||
 | 
			
		||||
                # if a remote error was set then likely the
 | 
			
		||||
                # exception group was raised due to that, so
 | 
			
		||||
                # and we instead raise that error immediately!
 | 
			
		||||
                if re := ctx._remote_error:
 | 
			
		||||
                    ctx._maybe_raise_remote_err(re)
 | 
			
		||||
 | 
			
		||||
                # maybe TODO: pack in
 | 
			
		||||
                # ``trio.Cancelled.__traceback__`` here so they can
 | 
			
		||||
                # be unwrapped and displayed on the caller side?
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            finally:
 | 
			
		||||
| 
						 | 
				
			
			@ -244,10 +257,10 @@ async def _invoke(
 | 
			
		|||
                # don't pop the local context until we know the
 | 
			
		||||
                # associated child isn't in debug any more
 | 
			
		||||
                await _debug.maybe_wait_for_debugger()
 | 
			
		||||
                ctx = actor._contexts.pop((chan.uid, cid))
 | 
			
		||||
                if ctx:
 | 
			
		||||
                ctx: Context = actor._contexts.pop((chan.uid, cid))
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                        f'Context entrypoint {func} was terminated:\n{ctx}'
 | 
			
		||||
                    f'Context entrypoint {func} was terminated:\n'
 | 
			
		||||
                    f'{ctx}'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            if ctx.cancelled_caught:
 | 
			
		||||
| 
						 | 
				
			
			@ -256,43 +269,43 @@ async def _invoke(
 | 
			
		|||
                # before raising any context cancelled case
 | 
			
		||||
                # so that real remote errors don't get masked as
 | 
			
		||||
                # ``ContextCancelled``s.
 | 
			
		||||
                re = ctx._remote_error
 | 
			
		||||
                if re:
 | 
			
		||||
                if re := ctx._remote_error:
 | 
			
		||||
                    ctx._maybe_raise_remote_err(re)
 | 
			
		||||
 | 
			
		||||
                fname = func.__name__
 | 
			
		||||
                fname: str = func.__name__
 | 
			
		||||
                cs: trio.CancelScope = ctx._scope
 | 
			
		||||
                if cs.cancel_called:
 | 
			
		||||
                    canceller = ctx._cancelled_remote
 | 
			
		||||
                    canceller: tuple = ctx.canceller
 | 
			
		||||
                    msg: str = (
 | 
			
		||||
                        f'`{fname}()`@{actor.uid} cancelled by '
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                    # NOTE / TODO: if we end up having
 | 
			
		||||
                    # ``Actor._cancel_task()`` call
 | 
			
		||||
                    # ``Context.cancel()`` directly, we're going to
 | 
			
		||||
                    # need to change this logic branch since it will
 | 
			
		||||
                    # always enter..
 | 
			
		||||
                    # need to change this logic branch since it
 | 
			
		||||
                    # will always enter..
 | 
			
		||||
                    if ctx._cancel_called:
 | 
			
		||||
                        msg = f'`{fname}()`@{actor.uid} cancelled itself'
 | 
			
		||||
 | 
			
		||||
                    else:
 | 
			
		||||
                        msg = (
 | 
			
		||||
                            f'`{fname}()`@{actor.uid} '
 | 
			
		||||
                            'was remotely cancelled by '
 | 
			
		||||
                        )
 | 
			
		||||
                        msg += 'itself '
 | 
			
		||||
 | 
			
		||||
                    # if the channel which spawned the ctx is the
 | 
			
		||||
                    # one that cancelled it then we report that, vs.
 | 
			
		||||
                    # it being some other random actor that for ex.
 | 
			
		||||
                    # some actor who calls `Portal.cancel_actor()`
 | 
			
		||||
                    # and by side-effect cancels this ctx.
 | 
			
		||||
                    if canceller == ctx.chan.uid:
 | 
			
		||||
                        msg += f'its caller {canceller}'
 | 
			
		||||
                    elif canceller == ctx.chan.uid:
 | 
			
		||||
                        msg += f'its caller {canceller} '
 | 
			
		||||
 | 
			
		||||
                    else:
 | 
			
		||||
                        msg += f'remote actor {canceller}'
 | 
			
		||||
 | 
			
		||||
                    # TODO: does this ever get set any more or can
 | 
			
		||||
                    # we remove it?
 | 
			
		||||
                    if ctx._cancel_msg:
 | 
			
		||||
                        msg += f' with msg:\n{ctx._cancel_msg}'
 | 
			
		||||
                        msg += (
 | 
			
		||||
                            ' with msg:\n'
 | 
			
		||||
                            f'{ctx._cancel_msg}'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    # task-contex was either cancelled by request using
 | 
			
		||||
                    # ``Portal.cancel_actor()`` or ``Context.cancel()``
 | 
			
		||||
| 
						 | 
				
			
			@ -305,10 +318,13 @@ async def _invoke(
 | 
			
		|||
                        canceller=canceller,
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
        # regular async function
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                await chan.send({'functype': 'asyncfunc', 'cid': cid})
 | 
			
		||||
                await chan.send({
 | 
			
		||||
                    'functype': 'asyncfunc',
 | 
			
		||||
                    'cid': cid
 | 
			
		||||
                })
 | 
			
		||||
            except trio.BrokenResourceError:
 | 
			
		||||
                failed_resp = True
 | 
			
		||||
                if is_rpc:
 | 
			
		||||
| 
						 | 
				
			
			@ -322,7 +338,7 @@ async def _invoke(
 | 
			
		|||
                ctx._scope = cs
 | 
			
		||||
                task_status.started(ctx)
 | 
			
		||||
                result = await coro
 | 
			
		||||
                fname = func.__name__
 | 
			
		||||
                fname: str = func.__name__
 | 
			
		||||
                log.runtime(f'{fname}() result: {result}')
 | 
			
		||||
                if not failed_resp:
 | 
			
		||||
                    # only send result if we know IPC isn't down
 | 
			
		||||
| 
						 | 
				
			
			@ -1162,7 +1178,12 @@ class Actor:
 | 
			
		|||
            - return control the parent channel message loop
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        log.cancel(f"{self.uid} is trying to cancel")
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'{self.uid} requested to cancel by:\n'
 | 
			
		||||
            f'{requesting_uid}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # TODO: what happens here when we self-cancel tho?
 | 
			
		||||
        self._cancel_called_by_remote: tuple = requesting_uid
 | 
			
		||||
        self._cancel_called = True
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1177,7 +1198,9 @@ class Actor:
 | 
			
		|||
                dbcs.cancel()
 | 
			
		||||
 | 
			
		||||
            # kill all ongoing tasks
 | 
			
		||||
            await self.cancel_rpc_tasks(requesting_uid=requesting_uid)
 | 
			
		||||
            await self.cancel_rpc_tasks(
 | 
			
		||||
                requesting_uid=requesting_uid,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # stop channel server
 | 
			
		||||
            self.cancel_server()
 | 
			
		||||
| 
						 | 
				
			
			@ -1207,8 +1230,8 @@ class Actor:
 | 
			
		|||
        self,
 | 
			
		||||
        cid: str,
 | 
			
		||||
        chan: Channel,
 | 
			
		||||
 | 
			
		||||
        requesting_uid: tuple[str, str] | None = None,
 | 
			
		||||
 | 
			
		||||
    ) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        Cancel a local task by call-id / channel.
 | 
			
		||||
| 
						 | 
				
			
			@ -1225,7 +1248,7 @@ class Actor:
 | 
			
		|||
            # this ctx based lookup ensures the requested task to
 | 
			
		||||
            # be cancelled was indeed spawned by a request from this channel
 | 
			
		||||
            ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
 | 
			
		||||
            scope = ctx._scope
 | 
			
		||||
            scope: trio.CancelScope = ctx._scope
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            log.cancel(f"{cid} has already completed/terminated?")
 | 
			
		||||
            return True
 | 
			
		||||
| 
						 | 
				
			
			@ -1235,10 +1258,10 @@ class Actor:
 | 
			
		|||
            f"peer: {chan.uid}\n")
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            ctx._cancelled_remote is None
 | 
			
		||||
            ctx._canceller is None
 | 
			
		||||
            and requesting_uid
 | 
			
		||||
        ):
 | 
			
		||||
            ctx._cancelled_remote: tuple = requesting_uid
 | 
			
		||||
            ctx._canceller: tuple = requesting_uid
 | 
			
		||||
 | 
			
		||||
        # don't allow cancelling this function mid-execution
 | 
			
		||||
        # (is this necessary?)
 | 
			
		||||
| 
						 | 
				
			
			@ -1248,6 +1271,7 @@ class Actor:
 | 
			
		|||
        # TODO: shouldn't we eventually be calling ``Context.cancel()``
 | 
			
		||||
        # directly here instead (since that method can handle both
 | 
			
		||||
        # side's calls into it?
 | 
			
		||||
        # await ctx.cancel()
 | 
			
		||||
        scope.cancel()
 | 
			
		||||
 | 
			
		||||
        # wait for _invoke to mark the task complete
 | 
			
		||||
| 
						 | 
				
			
			@ -1275,9 +1299,12 @@ class Actor:
 | 
			
		|||
        registered for each.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        tasks = self._rpc_tasks
 | 
			
		||||
        tasks: dict = self._rpc_tasks
 | 
			
		||||
        if tasks:
 | 
			
		||||
            log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'Cancelling all {len(tasks)} rpc tasks:\n'
 | 
			
		||||
                f'{tasks}'
 | 
			
		||||
            )
 | 
			
		||||
            for (
 | 
			
		||||
                (chan, cid),
 | 
			
		||||
                (ctx, func, is_complete),
 | 
			
		||||
| 
						 | 
				
			
			@ -1295,7 +1322,9 @@ class Actor:
 | 
			
		|||
                    )
 | 
			
		||||
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f"Waiting for remaining rpc tasks to complete {tasks}")
 | 
			
		||||
                'Waiting for remaining rpc tasks to complete:\n'
 | 
			
		||||
                f'{tasks}'
 | 
			
		||||
            )
 | 
			
		||||
            await self._ongoing_rpc_tasks.wait()
 | 
			
		||||
 | 
			
		||||
    def cancel_server(self) -> None:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue