From 131674eabd76a5b73d45259c9af4bd3d03832133 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 23 Oct 2023 14:35:36 -0400 Subject: [PATCH] Be mega-pedantic with `ContextCancelled` semantics As part of extremely detailed inter-peer-actor testing, add much more granular `Context` cancellation state tracking via the following (new) fields: - `.canceller: tuple[str, str]` the uuid of the actor responsible for the cancellation condition - always set by `Context._maybe_cancel_and_set_remote_error()` and replaces `._cancelled_remote` and `.cancel_called_remote`. If set, this value should normally always match a value from some `ContextCancelled` raised or caught by one side of the context. - `._local_error` which is always set to the locally raised (and caller or callee task's scope-internal) error which caused any eventual cancellation/error condition and thus any closure of the context's per-task-side-`trio.Nursery`. - `.cancelled_caught: bool` is now always `True` whenever the local task catches (or "silently absorbs") a `ContextCancelled` (a `ctxc`) that indeed originated from one of the context's linked tasks or any other context which raised its own `ctxc` in the current `.open_context()` scope. => whenever there is a case that no `ContextCancelled` was raised **in** the `.open_context().__aexit__()` (eg. `ctx.result()` called after a call `ctx.cancel()`), we still consider the context's as having "caught a cancellation" since the `ctxc` was indeed silently handled by the cancel requester; all other error cases are already represented by mirroring the state of the `._scope: trio.CancelScope` => IOW there should be **no case** where an error is **not raised** in the context's scope and `.cancelled_caught: bool == False`, i.e. no case where `._scope.cancelled_caught == False and ._local_error is not None`! - always raise any `ctxc` from `.open_stream()` if `._cancel_called == True` - if the cancellation request has not already resulted in a `._remote_error: ContextCancelled` we raise a `RuntimeError` to indicate improper usage to the guilty side's task code. - make `._maybe_raise_remote_err()` a sync func and don't raise any `ctxc` which is matched against a `.canceller` determined to be the current actor, aka a "self cancel", and always set the `._local_error` to any such `ctxc`. - `.side: str` taken from inside `.cancel()` and unused as of now since it might be better re-written as a similar `.is_opener() -> bool`? - drop unused `._started_received: bool`.. - TONS and TONS of detailed comments/docs to attempt to explain all the possible cancellation/exit cases and how they should exhibit as either silent closes or raises from the `Context` API! Adjust the `._runtime._invoke()` code to match: - use `ctx._maybe_raise_remote_err()` in `._invoke()`. - adjust to new `.canceller` property. - more type hints. - better `log.cancel()` msging around self-cancels vs. peer-cancels. - always set the `._local_error: BaseException` for the "callee" task just like `Portal.open_context()` now will do B) Prior we were raising any `Context._remote_error` directly and doing (more or less) the same `ContextCancelled` "absorbing" logic (well kinda) in block; instead delegate to the method --- tractor/_context.py | 232 +++++++++++++++++++++++++++++++++----------- tractor/_runtime.py | 119 ++++++++++++++--------- 2 files changed, 248 insertions(+), 103 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index c14f16b..117092a 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -56,6 +56,7 @@ from ._state import current_actor if TYPE_CHECKING: from ._portal import Portal + from ._runtime import Actor log = get_logger(__name__) @@ -64,20 +65,26 @@ log = get_logger(__name__) @dataclass class Context: ''' - An inter-actor, ``trio``-task communication context. + An inter-actor, SC transitive, `trio.Task` communication context. - NB: This class should never be instatiated directly, it is delivered - by either, - - runtime machinery to a remotely started task or, - - by entering ``Portal.open_context()``. + NB: This class should **never be instatiated directly**, it is allocated + by the runtime in 2 ways: + - by entering ``Portal.open_context()`` which is the primary + public API for any "caller" task or, + - by the RPC machinery's `._runtime._invoke()` as a `ctx` arg + to a remotely scheduled "callee" function. - and is always constructed using ``mkt_context()``. + AND is always constructed using the below ``mk_context()``. Allows maintaining task or protocol specific state between - 2 communicating, parallel executing actor tasks. A unique context is - allocated on each side of any task RPC-linked msg dialog, for - every request to a remote actor from a portal. On the "callee" - side a context is always allocated inside ``._runtime._invoke()``. + 2 cancel-scope-linked, communicating and parallel executing + `trio.Task`s. Contexts are allocated on each side of any task + RPC-linked msg dialog, i.e. for every request to a remote + actor from a `Portal`. On the "callee" side a context is + always allocated inside ``._runtime._invoke()``. + + # TODO: more detailed writeup on cancellation, error and + # streaming semantics.. A context can be cancelled and (possibly eventually restarted) from either side of the underlying IPC channel, it can also open task @@ -108,12 +115,31 @@ class Context: # which is exactly the primitive that allows for # cross-actor-task-supervision and thus SC. _scope: trio.CancelScope | None = None + + # on a clean exit there should be a final value + # delivered from the far end "callee" task, so + # this value is only set on one side. _result: Any | int = None + + # if the local "caller" task errors this + # value is always set to the error that was + # captured in the `Portal.open_context().__aexit__()` + # teardown. + _local_error: BaseException | None = None + + # if the either side gets an error from the other + # this value is set to that error unpacked from an + # IPC msg. _remote_error: BaseException | None = None - # cancellation state + # only set if the local task called `.cancel()` _cancel_called: bool = False # did WE cancel the far end? - _cancelled_remote: tuple[str, str] | None = None + + # TODO: do we even need this? we can assume that if we're + # cancelled that the other side is as well, so maybe we should + # instead just have a `.canceller` pulled from the + # `ContextCancelled`? + _canceller: tuple[str, str] | None = None # NOTE: we try to ensure assignment of a "cancel msg" since # there's always going to be an "underlying reason" that any @@ -145,23 +171,47 @@ class Context: return self._cancel_called @property - def cancel_called_remote(self) -> tuple[str, str] | None: + def canceller(self) -> tuple[str, str] | None: ''' - ``Actor.uid`` of the remote actor who's task was cancelled - causing this side of the context to also be cancelled. + ``Actor.uid: tuple[str, str]`` of the (remote) + actor-process who's task was cancelled thus causing this + (side of the) context to also be cancelled. ''' - remote_uid = self._cancelled_remote - if remote_uid: - return tuple(remote_uid) + return self._canceller @property def cancelled_caught(self) -> bool: - return self._scope.cancelled_caught + return ( + # the local scope was cancelled either by + # remote error or self-request + self._scope.cancelled_caught + + # the local scope was never cancelled + # and instead likely we received a remote side + # cancellation that was raised inside `.result()` + or ( + (se := self._local_error) + and + isinstance(se, ContextCancelled) + and ( + se.canceller == self.canceller + or + se is self._remote_error + ) + ) + ) + + @property + def side(self) -> str: + ''' + Return string indicating which task this instance is wrapping. + + ''' + return 'caller' if self._portal else 'callee' # init and streaming state _started_called: bool = False - _started_received: bool = False _stream_opened: bool = False # overrun handling machinery @@ -196,7 +246,7 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - async def _maybe_cancel_and_set_remote_error( + def _maybe_cancel_and_set_remote_error( self, error: BaseException, @@ -269,16 +319,19 @@ class Context: # that error as the reason. self._remote_error: BaseException = error - # always record the remote actor's uid since its cancellation - # state is directly linked to ours (the local one). - self._cancelled_remote = self.chan.uid - if ( isinstance(error, ContextCancelled) ): + # always record the cancelling actor's uid since its cancellation + # state is linked and we want to know which process was + # the cause / requester of the cancellation. + self._canceller = error.canceller + log.cancel( - 'Remote task-context sucessfully cancelled for ' - f'{self.chan.uid}:{self.cid}' + 'Remote task-context was cancelled for ' + f'actor: {self.chan.uid}\n' + f'task: {self.cid}\n' + f'canceller: {error.canceller}\n' ) if self._cancel_called: @@ -289,22 +342,37 @@ class Context: # and we **don't need to raise it** in local cancel # scope since it will potentially override a real error. return + else: log.error( - f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'Remote context error,\n' + f'remote actor: {self.chan.uid}\n' + f'task: {self.cid}\n' f'{error}' ) + self._canceller = self.chan.uid + # TODO: tempted to **not** do this by-reraising in a # nursery and instead cancel a surrounding scope, detect # the cancellation, then lookup the error that was set? # YES! this is way better and simpler! - if self._scope: + cs: trio.CancelScope = self._scope + if ( + cs + and not cs.cancel_called + and not cs.cancelled_caught + ): + + # TODO: we can for sure drop this right? # from trio.testing import wait_all_tasks_blocked # await wait_all_tasks_blocked() - # self._cancelled_remote = self.chan.uid + + # TODO: it'd sure be handy to inject our own + # `trio.Cancelled` subtype here ;) + # https://github.com/goodboy/tractor/issues/368 self._scope.cancel() - # this REPL usage actually works here BD + # NOTE: this REPL usage actually works here dawg! Bo # from .devx._debug import pause # await pause() @@ -320,13 +388,19 @@ class Context: Timeout quickly in an attempt to sidestep 2-generals... ''' - side: str = 'caller' if self._portal else 'callee' + side: str = self.side log.cancel( f'Cancelling {side} side of context to {self.chan.uid}' ) - self._cancel_called: bool = True + # caller side who entered `Portal.open_context()` + # NOTE: on the call side we never manually call + # `._scope.cancel()` since we expect the eventual + # `ContextCancelled` from the other side to trigger this + # when the runtime finally receives it during teardown + # (normally in `.result()` called from + # `Portal.open_context().__aexit__()`) if side == 'caller': if not self._portal: raise RuntimeError( @@ -349,7 +423,6 @@ class Context: '_cancel_task', cid=cid, ) - # print("EXITING CANCEL CALL") if cs.cancelled_caught: # XXX: there's no way to know if the remote task was indeed @@ -368,6 +441,9 @@ class Context: ) # callee side remote task + # NOTE: on this side we ALWAYS cancel the local scope since + # the caller expects a `ContextCancelled` to be sent from + # `._runtime._invoke()` back to the other side. else: # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an @@ -403,7 +479,7 @@ class Context: ``trio``'s cancellation system. ''' - actor = current_actor() + actor: Actor = current_actor() # here we create a mem chan that corresponds to the # far end caller / callee. @@ -413,12 +489,34 @@ class Context: # killed if self._cancel_called: - task = trio.lowlevel.current_task().name - raise ContextCancelled( - f'Context around {actor.uid[0]}:{task} was already cancelled!' + + # XXX NOTE: ALWAYS RAISE any remote error here even if + # it's an expected `ContextCancelled` (after some local + # task having called `.cancel()` ! + # + # WHY: we expect the error to always bubble up to the + # surrounding `Portal.open_context()` call and be + # absorbed there (silently) and we DO NOT want to + # actually try to stream - a cancel msg was already + # sent to the other side! + if re := self._remote_error: + raise self._remote_error + + # XXX NOTE: if no `ContextCancelled` has been responded + # back from the other side (yet), we raise a different + # runtime error indicating that this task's usage of + # `Context.cancel()` and then `.open_stream()` is WRONG! + task: str = trio.lowlevel.current_task().name + raise RuntimeError( + 'Stream opened after `Context.cancel()` called..?\n' + f'task: {actor.uid[0]}:{task}\n' + f'{self}' ) - if not self._portal and not self._started_called: + if ( + not self._portal + and not self._started_called + ): raise RuntimeError( 'Context.started()` must be called before opening a stream' ) @@ -434,7 +532,7 @@ class Context: msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, ) - ctx._allow_overruns = allow_overruns + ctx._allow_overruns: bool = allow_overruns assert ctx is self # XXX: If the underlying channel feeder receive mem chan has @@ -444,27 +542,32 @@ class Context: if ctx._recv_chan._closed: raise trio.ClosedResourceError( - 'The underlying channel for this stream was already closed!?') + 'The underlying channel for this stream was already closed!?' + ) async with MsgStream( ctx=self, rx_chan=ctx._recv_chan, ) as stream: + # NOTE: we track all existing streams per portal for + # the purposes of attempting graceful closes on runtime + # cancel requests. if self._portal: self._portal._streams.add(stream) try: - self._stream_opened = True + self._stream_opened: bool = True # XXX: do we need this? # ensure we aren't cancelled before yielding the stream # await trio.lowlevel.checkpoint() yield stream - # NOTE: Make the stream "one-shot use". On exit, signal - # ``trio.EndOfChannel``/``StopAsyncIteration`` to the - # far end. + # NOTE: Make the stream "one-shot use". On exit, + # signal + # ``trio.EndOfChannel``/``StopAsyncIteration`` to + # the far end. await stream.aclose() finally: @@ -495,14 +598,22 @@ class Context: # whenever ``CancelScope.cancel()`` was called) and # instead silently reap the expected cancellation # "error"-msg. + our_uid: tuple[str, str] = current_actor().uid if ( isinstance(err, ContextCancelled) and ( self._cancel_called or self.chan._cancel_called - or tuple(err.canceller) == current_actor().uid + or self.canceller == our_uid + or tuple(err.canceller) == our_uid ) ): + # NOTE: we set the local scope error to any "self + # cancellation" error-response thus "absorbing" + # the error silently B) + if self._local_error is None: + self._local_error = err + return err # NOTE: currently we are masking underlying runtime errors @@ -515,7 +626,7 @@ class Context: # runtime frames from the tb explicitly? # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://stackoverflow.com/a/24752607 - __tracebackhide__: bool = True + # __tracebackhide__: bool = True raise err from None async def result(self) -> Any | Exception: @@ -544,7 +655,6 @@ class Context: of the remote cancellation. ''' - __tracebackhide__: bool = True assert self._portal, "Context.result() can not be called from callee!" assert self._recv_chan @@ -607,13 +717,15 @@ class Context: "Received internal error at portal?" ) - err = unpack_error( + if err:= unpack_error( msg, self._portal.channel - ) # from msgerr + ): # from msgerr + self._maybe_cancel_and_set_remote_error(err) + self._maybe_raise_remote_err(err) - err = self._maybe_raise_remote_err(err) - self._remote_error = err + else: + raise if re := self._remote_error: return self._maybe_raise_remote_err(re) @@ -724,13 +836,17 @@ class Context: f"Delivering {msg} from {uid} to caller {cid}" ) - error = msg.get('error') - if error := unpack_error( - msg, - self.chan, + if ( + msg.get('error') # check for field + and ( + error := unpack_error( + msg, + self.chan, + ) + ) ): self._cancel_msg = msg - await self._maybe_cancel_and_set_remote_error(error) + self._maybe_cancel_and_set_remote_error(error) if ( self._in_overrun @@ -765,7 +881,7 @@ class Context: # XXX: always push an error even if the local # receiver is in overrun state. - # await self._maybe_cancel_and_set_remote_error(msg) + # self._maybe_cancel_and_set_remote_error(msg) local_uid = current_actor().uid lines = [ diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 5f4da96..fee14c4 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -86,12 +86,14 @@ async def _invoke( ] = trio.TASK_STATUS_IGNORED, ): ''' - Invoke local func and deliver result(s) over provided channel. + Schedule a `trio` task-as-func and deliver result(s) over + connected IPC channel. - This is the core "RPC task" starting machinery. + This is the core "RPC" `trio.Task` scheduling machinery used to start every + remotely invoked function, normally in `Actor._service_n: trio.Nursery`. ''' - __tracebackhide__ = True + __tracebackhide__: bool = True treat_as_gen: bool = False failed_resp: bool = False @@ -209,6 +211,8 @@ async def _invoke( # far end async gen to tear down await chan.send({'stop': True, 'cid': cid}) + # TODO: every other "func type" should be implemented from + # a special case of a context eventually! elif context: # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) @@ -219,21 +223,30 @@ async def _invoke( ctx._scope = nurse.cancel_scope task_status.started(ctx) res = await coro - await chan.send({'return': res, 'cid': cid}) + await chan.send({ + 'return': res, + 'cid': cid + }) # XXX: do we ever trigger this block any more? except ( BaseExceptionGroup, trio.Cancelled, - ): - # if a context error was set then likely - # thei multierror was raised due to that - if ctx._remote_error is not None: - raise ctx._remote_error + ) as scope_error: - # maybe TODO: pack in ``trio.Cancelled.__traceback__`` here - # so they can be unwrapped and displayed on the caller - # side? + # always set this (callee) side's exception as the + # local error on the context + ctx._local_error: BaseException = scope_error + + # if a remote error was set then likely the + # exception group was raised due to that, so + # and we instead raise that error immediately! + if re := ctx._remote_error: + ctx._maybe_raise_remote_err(re) + + # maybe TODO: pack in + # ``trio.Cancelled.__traceback__`` here so they can + # be unwrapped and displayed on the caller side? raise finally: @@ -244,11 +257,11 @@ async def _invoke( # don't pop the local context until we know the # associated child isn't in debug any more await _debug.maybe_wait_for_debugger() - ctx = actor._contexts.pop((chan.uid, cid)) - if ctx: - log.runtime( - f'Context entrypoint {func} was terminated:\n{ctx}' - ) + ctx: Context = actor._contexts.pop((chan.uid, cid)) + log.runtime( + f'Context entrypoint {func} was terminated:\n' + f'{ctx}' + ) if ctx.cancelled_caught: @@ -256,43 +269,43 @@ async def _invoke( # before raising any context cancelled case # so that real remote errors don't get masked as # ``ContextCancelled``s. - re = ctx._remote_error - if re: + if re := ctx._remote_error: ctx._maybe_raise_remote_err(re) - fname = func.__name__ + fname: str = func.__name__ cs: trio.CancelScope = ctx._scope if cs.cancel_called: - canceller = ctx._cancelled_remote + canceller: tuple = ctx.canceller + msg: str = ( + f'`{fname}()`@{actor.uid} cancelled by ' + ) # NOTE / TODO: if we end up having # ``Actor._cancel_task()`` call # ``Context.cancel()`` directly, we're going to - # need to change this logic branch since it will - # always enter.. + # need to change this logic branch since it + # will always enter.. if ctx._cancel_called: - msg = f'`{fname}()`@{actor.uid} cancelled itself' - - else: - msg = ( - f'`{fname}()`@{actor.uid} ' - 'was remotely cancelled by ' - ) + msg += 'itself ' # if the channel which spawned the ctx is the # one that cancelled it then we report that, vs. # it being some other random actor that for ex. # some actor who calls `Portal.cancel_actor()` # and by side-effect cancels this ctx. - if canceller == ctx.chan.uid: - msg += f'its caller {canceller}' + elif canceller == ctx.chan.uid: + msg += f'its caller {canceller} ' + else: msg += f'remote actor {canceller}' # TODO: does this ever get set any more or can # we remove it? if ctx._cancel_msg: - msg += f' with msg:\n{ctx._cancel_msg}' + msg += ( + ' with msg:\n' + f'{ctx._cancel_msg}' + ) # task-contex was either cancelled by request using # ``Portal.cancel_actor()`` or ``Context.cancel()`` @@ -305,10 +318,13 @@ async def _invoke( canceller=canceller, ) + # regular async function else: - # regular async function try: - await chan.send({'functype': 'asyncfunc', 'cid': cid}) + await chan.send({ + 'functype': 'asyncfunc', + 'cid': cid + }) except trio.BrokenResourceError: failed_resp = True if is_rpc: @@ -322,7 +338,7 @@ async def _invoke( ctx._scope = cs task_status.started(ctx) result = await coro - fname = func.__name__ + fname: str = func.__name__ log.runtime(f'{fname}() result: {result}') if not failed_resp: # only send result if we know IPC isn't down @@ -1162,7 +1178,12 @@ class Actor: - return control the parent channel message loop ''' - log.cancel(f"{self.uid} is trying to cancel") + log.cancel( + f'{self.uid} requested to cancel by:\n' + f'{requesting_uid}' + ) + + # TODO: what happens here when we self-cancel tho? self._cancel_called_by_remote: tuple = requesting_uid self._cancel_called = True @@ -1177,7 +1198,9 @@ class Actor: dbcs.cancel() # kill all ongoing tasks - await self.cancel_rpc_tasks(requesting_uid=requesting_uid) + await self.cancel_rpc_tasks( + requesting_uid=requesting_uid, + ) # stop channel server self.cancel_server() @@ -1207,8 +1230,8 @@ class Actor: self, cid: str, chan: Channel, - requesting_uid: tuple[str, str] | None = None, + ) -> bool: ''' Cancel a local task by call-id / channel. @@ -1225,7 +1248,7 @@ class Actor: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel ctx, func, is_complete = self._rpc_tasks[(chan, cid)] - scope = ctx._scope + scope: trio.CancelScope = ctx._scope except KeyError: log.cancel(f"{cid} has already completed/terminated?") return True @@ -1235,10 +1258,10 @@ class Actor: f"peer: {chan.uid}\n") if ( - ctx._cancelled_remote is None + ctx._canceller is None and requesting_uid ): - ctx._cancelled_remote: tuple = requesting_uid + ctx._canceller: tuple = requesting_uid # don't allow cancelling this function mid-execution # (is this necessary?) @@ -1248,6 +1271,7 @@ class Actor: # TODO: shouldn't we eventually be calling ``Context.cancel()`` # directly here instead (since that method can handle both # side's calls into it? + # await ctx.cancel() scope.cancel() # wait for _invoke to mark the task complete @@ -1275,9 +1299,12 @@ class Actor: registered for each. ''' - tasks = self._rpc_tasks + tasks: dict = self._rpc_tasks if tasks: - log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + log.cancel( + f'Cancelling all {len(tasks)} rpc tasks:\n' + f'{tasks}' + ) for ( (chan, cid), (ctx, func, is_complete), @@ -1295,7 +1322,9 @@ class Actor: ) log.cancel( - f"Waiting for remaining rpc tasks to complete {tasks}") + 'Waiting for remaining rpc tasks to complete:\n' + f'{tasks}' + ) await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: