diff --git a/tractor/_context.py b/tractor/_context.py index f333c9e..3dcf815 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -37,8 +37,9 @@ import inspect from pprint import pformat from typing import ( Any, - Callable, AsyncGenerator, + Callable, + Mapping, Type, TYPE_CHECKING, Union, @@ -59,7 +60,10 @@ from ._exceptions import ( pack_from_raise, unpack_error, ) -from .log import get_logger +from .log import ( + get_logger, + at_least_level, +) from .msg import ( Error, MsgType, @@ -83,6 +87,7 @@ from ._streaming import MsgStream from ._state import ( current_actor, debug_mode, + _ctxvar_Context, ) if TYPE_CHECKING: @@ -204,7 +209,7 @@ class Context: # cancelled that the other side is as well, so maybe we should # instead just have a `.canceller` pulled from the # `ContextCancelled`? - _canceller: tuple[str, str] | None = None + _canceller: tuple[str, str]|None = None # NOTE: we try to ensure assignment of a "cancel msg" since # there's always going to be an "underlying reason" that any @@ -384,8 +389,12 @@ class Context: re: BaseException|None = ( remote_error - or self._remote_error + or + self._remote_error ) + # XXX we only report "this context" as self-cancelled + # once we've received a ctxc from our direct-peer task + # (aka we're `.cancel_acked`). if not re: return False @@ -396,10 +405,10 @@ class Context: our_canceller = self.canceller return bool( - isinstance(re, ContextCancelled) + isinstance((ctxc := re), ContextCancelled) and from_uid == self.chan.uid - and re.canceller == our_uid - and our_canceller == from_uid + and ctxc.canceller == our_uid + and our_canceller == our_uid ) @property @@ -619,15 +628,27 @@ class Context: ) self._remote_error: BaseException = error + msgerr: bool = False + # self-cancel (ack) or, # peer propagated remote cancellation. - msgerr: bool = False if isinstance(error, ContextCancelled): + # NOTE in the case error is a ctxc the canceller will + # either be another peer or us. in the case where it's us + # we mark ourself as the canceller of ourselves (a ctx + # "self cancel" from this side's perspective), if instead + # the far end was cancelled by some other (inter-) peer, + # we want to mark our canceller as the actor that was + # cancelled, NOT their reported canceller. IOW in the + # latter case we're cancelled by someone else getting + # cancelled. + if (canc := error.canceller) == self._actor.uid: + whom: str = 'us' + self._canceller = canc + else: + whom = 'a remote peer (not us)' + self._canceller = error.src_uid - whom: str = ( - 'us' if error.canceller == self._actor.uid - else 'a remote peer (not us)' - ) log.cancel( f'IPC context was cancelled by {whom}!\n\n' f'{error}' @@ -635,6 +656,7 @@ class Context: elif isinstance(error, MsgTypeError): msgerr = True + self._canceller = error.src_uid log.error( f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n' f'{error}\n' @@ -642,28 +664,25 @@ class Context: ) else: + # always record the cancelling actor's uid since its + # cancellation state is linked and we want to know + # which process was the cause / requester of the + # cancellation. + maybe_error_src_uid: tuple = getattr( + error, + 'src_uid', + None, + ) + # we mark the source actor as our canceller + self._canceller = maybe_error_src_uid log.error( f'Remote context error:\n\n' # f'{pformat(self)}\n' f'{error}\n' ) - # always record the cancelling actor's uid since its - # cancellation state is linked and we want to know - # which process was the cause / requester of the - # cancellation. - maybe_error_src: tuple = getattr( - error, - 'src_uid', - None, - ) - self._canceller = ( - maybe_error_src - or - # XXX: in the case we get a non-boxed error? - # -> wait but this should never happen right? - self.chan.uid - ) + if self._canceller is None: + log.error('Ctx has no canceller set!?') # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error @@ -707,27 +726,34 @@ class Context: message: str = 'NOT cancelling `Context._scope` !\n\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?' - if cs: + if ( + cs + and + at_least_level(log=log, level='cancel') + ): fmt_str: str = self.pformat( extra_fields={ '._is_self_cancelled()': self._is_self_cancelled(), '._cancel_on_msgerr': self._cancel_on_msgerr, - - '._scope': cs, - '._scope.cancel_called': cs.cancel_called, - '._scope.cancelled_caught': cs.cancelled_caught, - '._scope._cancel_status': cs._cancel_status, } ) + from .devx.pformat import pformat_cs + cs_fmt: str = pformat_cs( + cs, + var_name='Context._scope', + ) + fmt_str += ( + '\n' + + + cs_fmt + ) log.cancel( message + fmt_str ) - # TODO: maybe we should also call `._res_scope.cancel()` if it - # exists to support cancelling any drain loop hangs? - # TODO: add to `Channel`? + # TODO: also add to `Channel`? @property def dst_maddr(self) -> str: chan: Channel = self.chan @@ -1100,7 +1126,8 @@ class Context: f'ctx id: {self.cid}' ) - # TODO: replace all the instances of this!! XD + # TODO: replace all the `._maybe_raise_remote_err()` usage + # with instances of this!! def maybe_raise( self, hide_tb: bool = True, @@ -1111,6 +1138,7 @@ class Context: if re := self._remote_error: return self._maybe_raise_remote_err( re, + hide_tb=hide_tb, **kwargs, ) @@ -1212,7 +1240,6 @@ class Context: # runtime frames from the tb explicitly? # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://stackoverflow.com/a/24752607 - __tracebackhide__: bool = True raise remote_error # from None # TODO: change to `.wait_for_result()`? @@ -1263,8 +1290,15 @@ class Context: # wait for a final context result/error by "draining" # (by more or less ignoring) any bi-dir-stream "yield" # msgs still in transit from the far end. + # + # XXX NOTE XXX: this call shouldn't really ever raise + # (other then internal error), instead delivering an + # `Error`-msg and that being `.maybe_raise()`-ed below + # since every message should be delivered via the normal + # `._deliver_msg()` route which will appropriately set + # any `.maybe_error`. ( - return_msg, + outcome_msg, drained_msgs, ) = await msgops.drain_to_final_msg( ctx=self, @@ -1282,13 +1316,18 @@ class Context: f'{msg}\n' ) - log.cancel( - 'Ctx drained to final result msgs\n' - f'{return_msg}\n\n' - - f'pre-result drained msgs:\n' - f'{pformat(drained_msgs)}\n' + drained_status: str = ( + 'Ctx drained to final outcome msg\n\n' + f'{outcome_msg}\n' ) + if drained_msgs: + drained_status += ( + '\n' + f'The pre-drained msgs are\n' + f'{pformat(drained_msgs)}\n' + ) + + log.cancel(drained_status) self.maybe_raise( # NOTE: obvi we don't care if we @@ -1319,7 +1358,7 @@ class Context: @property def maybe_error(self) -> BaseException|None: - le: Exception|None = self._local_error + le: BaseException|None = self._local_error re: RemoteActorError|ContextCancelled|None = self._remote_error match (le, re): @@ -1347,7 +1386,7 @@ class Context: # ContextCancelled(canceller=), # ): - error: Exception|None = le or re + error: BaseException|None = le or re if error: return error @@ -1462,52 +1501,63 @@ class Context: ''' merr: Exception|None = self.maybe_error outcome: Unresolved|Exception|Any = self.outcome - + status: str|None = None match ( outcome, merr, ): + # "graceful" ctx cancellation case ( Unresolved, ContextCancelled(), - ) if self.cancel_acked: - status = 'self-cancelled' - - case ( - Unresolved, - ContextCancelled(), - ) if ( - self.canceller - and not self._cancel_called ): - status = 'peer-cancelled' + if self._is_self_cancelled(): + status = 'self-cancelled' + elif ( + self.canceller + and not self._cancel_called + ): + status = 'peer-cancelled' + # (remote) error condition case ( Unresolved, - BaseException(), - ) if self.canceller: + BaseException(), # any error-type + ): status = 'errored' + # result already returned case ( _, # any non-unresolved value None, ) if self._final_result_is_set(): status = 'returned' + # normal operation but still in a pre-`Return`-result + # dialog phase case ( - Unresolved, # noqa (weird.. ruff) - None, + Unresolved, # noqa (ruff, you so weird..) + None, # no (remote) error set ): if stream := self._stream: if stream.closed: status = 'streaming-finished' else: status = 'streaming' + elif self._started_called: status = 'started' - case _: - status = 'unknown!?' + else: + if self.side == 'child': + status = 'pre-started' + else: + status = 'syncing-to-child' + + if status is None: + status = '??unknown??' + # from tractor.devx import mk_pdb + # mk_pdb().set_trace() return status @@ -1738,7 +1788,6 @@ class Context: f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n' f'{flow_body}' - f'{pformat(re)}\n' ) self._cancel_msg: dict = msg @@ -2003,6 +2052,7 @@ async def open_context_from_portal( ) assert ctx._remote_func_type == 'context' assert ctx._caller_info + _ctxvar_Context.set(ctx) # XXX NOTE since `._scope` is NOT set BEFORE we retreive the # `Started`-msg any cancellation triggered @@ -2156,7 +2206,7 @@ async def open_context_from_portal( # CASE 2: context was cancelled by local task calling # `.cancel()`, we don't raise and the exit block should - # exit silently. + # finish silently. if ( ctx._cancel_called and @@ -2281,6 +2331,11 @@ async def open_context_from_portal( try: result_or_err: Exception|Any = await ctx.result() except BaseException as berr: + # cancelled before (or maybe during?) final result capture + # if isinstance(trio.Cancelled, berr): + # from .devx import mk_pdb + # mk_pdb.set_trace() + # on normal teardown, if we get some error # raised in `Context.result()` we still want to # save that error on the ctx's state to @@ -2476,12 +2531,12 @@ def mk_context( _caller_info=caller_info, **kwargs, ) - # TODO: we can drop the old placeholder yah? - # ctx._result: int | Any = id(ctx) ctx._result = Unresolved return ctx +# TODO: use the new type-parameters to annotate this in 3.13? +# -[ ] https://peps.python.org/pep-0718/#unknown-types def context(func: Callable) -> Callable: ''' Mark an (async) function as an SC-supervised, inter-`Actor`, @@ -2495,8 +2550,8 @@ def context(func: Callable) -> Callable: # https://github.com/python/mypy/issues/2087#issuecomment-769266912 func._tractor_context_function = True # type: ignore - sig = inspect.signature(func) - params = sig.parameters + sig: inspect.Signature = inspect.signature(func) + params: Mapping = sig.parameters if 'ctx' not in params: raise TypeError( "The first argument to the context function "