From 299429a2788635e3cba30839a1512d157137cf3a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Mar 2024 22:37:32 -0500 Subject: [PATCH] Deep `Context` refinements Spanning from the pub API, to instance `repr()` customization (for logging/REPL content), to the impl details around the notion of a "final outcome" and surrounding IPC msg draining mechanics during teardown. A few API and field updates: - new `.cancel_acked: bool` to replace what we were mostly using `.cancelled_caught: bool` for but, for purposes of better mapping the semantics of remote cancellation of parallel executing tasks; it's set only when `.cancel_called` is set and a ctxc arrives with a `.canceller` field set to the current actor uid indicating we requested and received acknowledgement from the other side's task that is cancelled gracefully. - strongly document and delegate (and prolly eventually remove as a pub attr) the `.cancelled_caught` property entirely to the underlying `._scope: trio.CancelScope`; the `trio` semantics don't really map well to the "parallel with IPC msging" case in the sense that for us it breaks the concept of the ctx/scope closure having "caught" something instead of having "received" a msg that the other side has "acknowledged" (i.e. which for us is the completion of cancellation). - new `.__repr__()`/`.__str__()` format that tries to tersely yet comprehensively as possible display everything you need to know about the 3 main layers of an SC-linked-IPC-context: * ipc: the transport + runtime layers net-addressing and prot info. * rpc: the specific linked caller-callee task signature details including task and msg-stream instances. * state: current execution and final outcome state of the task pair. * a teensie extra `.repr_rpc` for a condensed rpc signature. - new `.dst_maddr` to get a `libp2p` style "multi-address" (though right now it's just showing the transport layers so maybe we should move to to our `Channel`?) - new public instance-var fields supporting more granular remote cancellation/result/error state: * `.maybe_error: Exception|None` for any final (remote) error/ctxc which computes logic on the values of `._remote_error`/`._local_error` to determine the "final error" (if any) on termination. * `.outcome` to the final error or result (or `None` if un-terminated) * `.repr_outcome()` for a console/logging friendly version of the final result or error as needed for the `.__str__()`. - new private interface bits to support all of ^: * a new "no result yet" sentinel value, `Unresolved`, using a module level class singleton that `._result` is set too (instead of `id(self)`) to both determine if and present when no final result from the callee has-yet-been/was delivered (ever). => really we should get rid of `.result()` and change it to `.wait_for_result()` (or something)u * `_final_result_is_set()` predicate to avoid waiting for an already delivered result. * `._maybe_raise()` proto-impl that we should use to replace all the `if re:` blocks it can XD * new `._stream: MsgStream|None` for when a stream is opened to aid with the state repr mentioned above. Tweaks to the termination drain loop `_drain_to_final_msg()`: - obviously (obvi) use all the changes above when determining whether or not a "final outcome" has arrived and thus breaking from the loop ;) * like the `.outcome` `.maybe_error` and `._final_ctx_is_set()` in the `while` pred expression. - drop the `_recv_chan.receive_nowait()` + guard logic since it seems with all the surrounding (and coming soon) changes to `Portal.open_context()` using all the new API stuff (mentioned in first bullet set above) we never hit the case of inf-block? Oh right and obviously a ton of (hopefully improved) logging msg content changes, commented code removal and detailed comment-docs strewn about! --- tractor/_context.py | 1048 ++++++++++++++++++++++++++++--------------- 1 file changed, 675 insertions(+), 373 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index ee05a2b..f8aaf1c 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -43,19 +43,14 @@ import warnings import trio -# from .devx import ( -# maybe_wait_for_debugger, -# pause, -# ) from .msg import NamespacePath from ._exceptions import ( - # _raise_from_no_key_in_msg, - unpack_error, - pack_error, ContextCancelled, - # MessagingError, + InternalError, RemoteActorError, StreamOverrun, + pack_error, + unpack_error, ) from .log import get_logger from ._ipc import Channel @@ -65,6 +60,7 @@ from ._state import current_actor if TYPE_CHECKING: from ._portal import Portal from ._runtime import Actor + from ._ipc import MsgTransport log = get_logger(__name__) @@ -73,6 +69,7 @@ log = get_logger(__name__) async def _drain_to_final_msg( ctx: Context, + hide_tb: bool = True, msg_limit: int = 6, ) -> list[dict]: @@ -89,47 +86,72 @@ async def _drain_to_final_msg( `ctx.result()` cleanup and teardown sequence. ''' + __tracebackhide__: bool = hide_tb raise_overrun: bool = not ctx._allow_overruns # wait for a final context result by collecting (but # basically ignoring) any bi-dir-stream msgs still in transit # from the far end. pre_result_drained: list[dict] = [] - while not ctx._remote_error: + while not ( + ctx.maybe_error + and not ctx._final_result_is_set() + ): try: + # TODO: can remove? + # await trio.lowlevel.checkpoint() + # NOTE: this REPL usage actually works here dawg! Bo # from .devx._debug import pause # await pause() - if re := ctx._remote_error: - ctx._maybe_raise_remote_err( - re, - # NOTE: obvi we don't care if we - # overran the far end if we're already - # waiting on a final result (msg). - raise_overrun_from_self=raise_overrun, - ) # TODO: bad idea? + # -[ ] wrap final outcome channel wait in a scope so + # it can be cancelled out of band if needed? + # # with trio.CancelScope() as res_cs: # ctx._res_scope = res_cs # msg: dict = await ctx._recv_chan.receive() # if res_cs.cancelled_caught: + # TODO: ensure there's no more hangs, debugging the + # runtime pretty preaase! # from .devx._debug import pause # await pause() + + # TODO: can remove this finally? + # we have no more need for the sync draining right + # since we're can kinda guarantee the async + # `.receive()` below will never block yah? + # + # if ( + # ctx._cancel_called and ( + # ctx.cancel_acked + # # or ctx.chan._cancel_called + # ) + # # or not ctx._final_result_is_set() + # # ctx.outcome is not + # # or ctx.chan._closed + # ): + # try: + # msg: dict = await ctx._recv_chan.receive_nowait()() + # except trio.WouldBlock: + # log.warning( + # 'When draining already `.cancel_called` ctx!\n' + # 'No final msg arrived..\n' + # ) + # break + # else: + # msg: dict = await ctx._recv_chan.receive() + + # TODO: don't need it right jefe? + # with trio.move_on_after(1) as cs: + # if cs.cancelled_caught: + # from .devx._debug import pause + # await pause() + + # pray to the `trio` gawds that we're corrent with this msg: dict = await ctx._recv_chan.receive() - ctx._result: Any = msg['return'] - log.runtime( - 'Context delivered final draining msg:\n' - f'{pformat(msg)}' - ) - pre_result_drained.append(msg) - # NOTE: we don't need to do this right? - # XXX: only close the rx mem chan AFTER - # a final result is retreived. - # if ctx._recv_chan: - # await ctx._recv_chan.aclose() - break # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: @@ -150,7 +172,22 @@ async def _drain_to_final_msg( # continue to bubble up as normal. raise + try: + ctx._result: Any = msg['return'] + log.runtime( + 'Context delivered final draining msg:\n' + f'{pformat(msg)}' + ) + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if ctx._recv_chan: + # await ctx._recv_chan.aclose() + # TODO: ^ we don't need it right? + break + except KeyError: + # always capture unexpected/non-result msgs + pre_result_drained.append(msg) if 'yield' in msg: # far end task is still streaming to us so discard @@ -159,12 +196,12 @@ async def _drain_to_final_msg( (ctx._stream.closed and (reason := 'stream was already closed') ) - or (ctx._cancel_called - and (reason := 'ctx called `.cancel()`') - ) or (ctx._cancelled_caught and (reason := 'ctx caught a cancel') ) + or (ctx._cancel_called + and (reason := 'ctx called `.cancel()`') + ) or (len(pre_result_drained) > msg_limit and (reason := f'"yield" limit={msg_limit}') ) @@ -193,7 +230,6 @@ async def _drain_to_final_msg( f'{pformat(msg)}\n' ) - pre_result_drained.append(msg) continue # TODO: work out edge cases here where @@ -206,13 +242,15 @@ async def _drain_to_final_msg( 'Remote stream terminated due to "stop" msg:\n\n' f'{pformat(msg)}\n' ) - pre_result_drained.append(msg) continue - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?" - ) + # It's an internal error if any other msg type without + # a`'cid'` field arrives here! + if not msg.get('cid'): + raise InternalError( + 'Unexpected cid-missing msg?\n\n' + f'{msg}\n' + ) # XXX fallthrough to handle expected error XXX re: Exception|None = ctx._remote_error @@ -273,11 +311,27 @@ async def _drain_to_final_msg( else: # bubble the original src key error raise + else: + log.cancel( + 'Skipping `MsgStream` drain since final outcome is set\n\n' + f'{ctx.outcome}\n' + ) return pre_result_drained -# TODO: make this a msgspec.Struct! +class Unresolved: + ''' + Placeholder value for `Context._result` until + a final return value or raised error is resolved. + + ''' + ... + + +# TODO: make this a .msg.types.Struct! +# -[ ] ideally we can freeze it +# -[ ] let's us do field diffing nicely in tests Bo @dataclass class Context: ''' @@ -332,28 +386,38 @@ class Context: # NOTE: each side of the context has its own cancel scope # which is exactly the primitive that allows for # cross-actor-task-supervision and thus SC. - _scope: trio.CancelScope | None = None + _scope: trio.CancelScope|None = None _task: trio.lowlevel.Task|None = None + + # TODO: cs around result waiting so we can cancel any + # permanently blocking `._recv_chan.receive()` call in + # a drain loop? # _res_scope: trio.CancelScope|None = None # on a clean exit there should be a final value # delivered from the far end "callee" task, so # this value is only set on one side. - _result: Any | int = None + # _result: Any | int = None + _result: Any|Unresolved = Unresolved - # if the local "caller" task errors this - # value is always set to the error that was - # captured in the `Portal.open_context().__aexit__()` - # teardown. - _local_error: BaseException | None = None + # if the local "caller" task errors this value is always set + # to the error that was captured in the + # `Portal.open_context().__aexit__()` teardown block OR, in + # 2 special cases when an (maybe) expected remote error + # arrives that we purposely swallow silently: + # - `ContextCancelled` with `.canceller` set to our uid: + # a self-cancel, + # - `RemoteActorError[StreamOverrun]` which was caught during + # a self-cancellation teardown msg drain. + _local_error: BaseException|None = None # if the either side gets an error from the other # this value is set to that error unpacked from an # IPC msg. - _remote_error: BaseException | None = None + _remote_error: BaseException|None = None - # only set if the local task called `.cancel()` - _cancel_called: bool = False # did WE cancel the far end? + # only set if an actor-local task called `.cancel()` + _cancel_called: bool = False # did WE request cancel of the far end? # TODO: do we even need this? we can assume that if we're # cancelled that the other side is as well, so maybe we should @@ -379,61 +443,6 @@ class Context: # actors from being able to acquire the debugger lock. _enter_debugger_on_cancel: bool = True - @property - def cancel_called(self) -> bool: - ''' - Records whether cancellation has been requested for this context - by either an explicit call to ``.cancel()`` or an implicit call - due to an error caught inside the ``Portal.open_context()`` - block. - - ''' - return self._cancel_called - - @property - def canceller(self) -> tuple[str, str] | None: - ''' - ``Actor.uid: tuple[str, str]`` of the (remote) - actor-process who's task was cancelled thus causing this - (side of the) context to also be cancelled. - - ''' - return self._canceller - - @property - def cancelled_caught(self) -> bool: - return ( - # the local scope was cancelled either by - # remote error or self-request - self._scope.cancelled_caught - - # the local scope was never cancelled - # and instead likely we received a remote side - # cancellation that was raised inside `.result()` - or ( - (se := self._local_error) - and - isinstance(se, ContextCancelled) - and ( - se.canceller == self.canceller - or - se is self._remote_error - ) - ) - ) - - # @property - # def is_waiting_result(self) -> bool: - # return bool(self._res_scope) - - @property - def side(self) -> str: - ''' - Return string indicating which task this instance is wrapping. - - ''' - return 'caller' if self._portal else 'callee' - # init and streaming state _started_called: bool = False _stream_opened: bool = False @@ -450,10 +459,196 @@ class Context: maxlen=616, ) ) - _scope_nursery: trio.Nursery | None = None + + # NOTE: this was originally a legacy interface from when we + # were raising remote errors (set to `._remote_error`) by + # starting a task inside this nursery that simply raised the + # boxed exception. NOW, it's used for spawning overrun queuing + # tasks when `.allow_overruns == True` !!! + _scope_nursery: trio.Nursery|None = None + + # streaming overrun state tracking _in_overrun: bool = False _allow_overruns: bool = False + + def __str__(self) -> str: + ds: str = '=' + # ds: str = ': ' + + # only show if opened + maybe_stream_repr: str = '' + if stream := self._stream: + # TODO: a `MsgStream.reprol()` !! + # f' stream{ds}{self._stream}\n' + # f' {self._stream}\n' + maybe_stream_repr: str = ( + f' {stream}\n' + ) + + return ( + f'\n' + ) + # NOTE: making this return a value that can be passed to + # `eval()` is entirely **optional** dawggg B) + # https://docs.python.org/3/library/functions.html#repr + # https://docs.python.org/3/reference/datamodel.html#object.__repr__ + # + # XXX: Currently we target **readability** from a (console) + # logging perspective over `eval()`-ability since we do NOT + # target serializing non-struct instances! + # def __repr__(self) -> str: + __repr__ = __str__ + + @property + def cancel_called(self) -> bool: + ''' + Records whether cancellation has been requested for this context + by a call to `.cancel()` either due to, + - either an explicit call by some local task, + - or an implicit call due to an error caught inside + the ``Portal.open_context()`` block. + + ''' + return self._cancel_called + + @property + def canceller(self) -> tuple[str, str] | None: + ''' + ``Actor.uid: tuple[str, str]`` of the (remote) + actor-process who's task was cancelled thus causing this + (side of the) context to also be cancelled. + + ''' + return self._canceller + + @property + def cancel_acked(self) -> bool: + ''' + Records whether the task on the remote side of this IPC + context acknowledged a cancel request via a relayed + `ContextCancelled` with the `.canceller` attr set to the + `Actor.uid` of the local actor who's task entered + `Portal.open_context()`. + + This will only be `True` when `.cancel()` is called and + the ctxc response contains a `.canceller: tuple` field + equal to the uid of the calling task's actor. + + ''' + portal: Portal|None = self._portal + if portal: + our_uid: tuple = portal.actor.uid + + return bool( + self._cancel_called + and (re := self._remote_error) + and isinstance(re, ContextCancelled) + and ( + re.canceller + == + self.canceller + == + our_uid + ) + ) + + @property + def cancelled_caught(self) -> bool: + ''' + Exactly the value of `self._scope.cancelled_caught` + (delegation) and should only be (able to be read as) + `True` for a `.side == "caller"` ctx wherein the + `Portal.open_context()` block was exited due to a call to + `._scope.cancel()` - which should only ocurr in 2 cases: + + - a caller side calls `.cancel()`, the far side cancels + and delivers back a `ContextCancelled` (making + `.cancel_acked == True`) and `._scope.cancel()` is + called by `._maybe_cancel_and_set_remote_error()` which + in turn cancels all `.open_context()` started tasks + (including any overrun queuing ones). + => `._scope.cancelled_caught == True` by normal `trio` + cs semantics. + + - a caller side is delivered a `._remote_error: + RemoteActorError` via `._deliver_msg()` and a transitive + call to `_maybe_cancel_and_set_remote_error()` calls + `._scope.cancel()` and that cancellation eventually + results in `trio.Cancelled`(s) caught in the + `.open_context()` handling around the @acm's `yield`. + + Only as an FYI, in the "callee" side case it can also be + set but never is readable by any task outside the RPC + machinery in `._invoke()` since,: + - when a callee side calls `.cancel()`, `._scope.cancel()` + is called immediately and handled specially inside + `._invoke()` to raise a `ContextCancelled` which is then + sent to the caller side. + + However, `._scope.cancelled_caught` can NEVER be + accessed/read as `True` by any RPC invoked task since it + will have terminated before the cs block exit. + + ''' + return bool( + # the local scope was cancelled either by + # remote error or self-request + (self._scope and self._scope.cancelled_caught) + + # the local scope was never cancelled + # and instead likely we received a remote side + # # cancellation that was raised inside `.result()` + # or ( + # (se := self._local_error) + # and se is re + # ) + ) + + # @property + # def is_waiting_result(self) -> bool: + # return bool(self._res_scope) + + @property + def side(self) -> str: + ''' + Return string indicating which task this instance is wrapping. + + ''' + return 'caller' if self._portal else 'callee' + async def send_yield( self, data: Any, @@ -501,17 +696,20 @@ class Context: when called/closed by actor local task(s). - NOTEs & TODOs: + NOTEs: - It is expected that the caller has previously unwrapped the remote error using a call to `unpack_error()` and provides that output exception value as the input - `error` argument here. + `error` argument *here*. + + TODOs: - If this is an error message from a context opened by - `Portal.open_context()` we want to interrupt any - ongoing local tasks operating within that `Context`'s - cancel-scope so as to be notified ASAP of the remote - error and engage any caller handling (eg. for - cross-process task supervision). + `Portal.open_context()` (ideally) we want to interrupt + any ongoing local tasks operating within that + `Context`'s cancel-scope so as to be notified ASAP of + the remote error and engage any caller handling (eg. + for cross-process task supervision). + - In some cases we may want to raise the remote error immediately since there is no guarantee the locally operating task(s) will attempt to execute a checkpoint @@ -519,10 +717,13 @@ class Context: approaches depending on the current task's work and wrapping "thread" type: - - `trio`-native-and-graceful: only ever wait for tasks - to exec a next `trio.lowlevel.checkpoint()` assuming - that any such task must do so to interact with the - actor runtime and IPC interfaces. + - Currently we only support + a `trio`-native-and-graceful approach: we only ever + wait for local tasks to exec a next + `trio.lowlevel.checkpoint()` assuming that any such + task must do so to interact with the actor runtime + and IPC interfaces and will then be cancelled by + the internal `._scope` block. - (NOT IMPLEMENTED) system-level-aggressive: maybe we could eventually interrupt sync code (invoked using @@ -543,80 +744,106 @@ class Context: # do their own error checking at their own call points and # result processing. - # XXX: set the remote side's error so that after we cancel - # whatever task is the opener of this context it can raise - # that error as the reason. + # TODO: never do this right? # if self._remote_error: # return - # breakpoint() - log.cancel( - 'Setting remote error for ctx \n' + # XXX: denote and set the remote side's error so that + # after we cancel whatever task is the opener of this + # context, it can raise or swallow that error + # appropriately. + log.runtime( + 'Setting remote error for ctx\n\n' f'<= remote ctx uid: {self.chan.uid}\n' - f'=>\n{error}' + f'=>{error}' ) self._remote_error: BaseException = error - if ( - isinstance(error, ContextCancelled) - ): - log.cancel( - 'Remote task-context was cancelled for ' - f'actor: {self.chan.uid}\n' - f'task: {self.cid}\n' - f'canceller: {error.canceller}\n' - ) - # always record the cancelling actor's uid since its cancellation - # state is linked and we want to know which process was - # the cause / requester of the cancellation. - # if error.canceller is None: - # import pdbp; pdbp.set_trace() + # self-cancel (ack) or, + # peer propagated remote cancellation. + if isinstance(error, ContextCancelled): + ctxc_src: tuple = error.canceller - # breakpoint() - self._canceller = error.canceller + whom: str = ( + 'us' if ctxc_src == current_actor().uid + else 'peer' + ) + log.cancel( + f'IPC context cancelled by {whom}!\n\n' + f'{error}' + ) + # always record the cancelling actor's uid since its + # cancellation state is linked and we want to know + # which process was the cause / requester of the + # cancellation. + self._canceller = ctxc_src if self._cancel_called: - # this is an expected cancel request response message - # and we **don't need to raise it** in local cancel - # scope since it will potentially override a real error. + # this is an expected cancel request response + # message and we **don't need to raise it** in the + # local cancel `._scope` since it will potentially + # override a real error. After this returns + # `.cancel_acked == True`. return else: log.error( - f'Remote context error:\n' + f'Remote context error:\n\n' + f'{error}\n' f'{pformat(self)}\n' - # f'remote actor: {self.chan.uid}\n' - # f'cid: {self.cid}\n' ) self._canceller = self.chan.uid - # TODO: tempted to **not** do this by-reraising in a - # nursery and instead cancel a surrounding scope, detect - # the cancellation, then lookup the error that was set? - # YES! this is way better and simpler! + # Cancel the local `._scope`, catch that + # `._scope.cancelled_caught` and re-raise any remote error + # once exiting (or manually calling `.result()`) the + # `.open_context()` block. cs: trio.CancelScope = self._scope if ( cs and not cs.cancel_called and not cs.cancelled_caught ): - - # TODO: we can for sure drop this right? - # from trio.testing import wait_all_tasks_blocked - # await wait_all_tasks_blocked() - # TODO: it'd sure be handy to inject our own # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 self._scope.cancel() - # NOTE: this REPL usage actually works here dawg! Bo - # await pause() + # TODO: maybe we should also call `._res_scope.cancel()` if it + # exists to support cancelling any drain loop hangs? - # TODO: maybe we have to use `._res_scope.cancel()` if it - # exists? + # TODO: add to `Channel`? + @property + def dst_maddr(self) -> str: + chan: Channel = self.chan + dst_addr, dst_port = chan.raddr + trans: MsgTransport = chan.transport + # cid: str = self.cid + # cid_head, cid_tail = cid[:6], cid[-6:] + return ( + f'/ipv4/{dst_addr}' + f'/{trans.name_key}/{dst_port}' + # f'/{self.chan.uid[0]}' + # f'/{self.cid}' + + # f'/cid={cid_head}..{cid_tail}' + # TODO: ? not use this ^ right ? + ) + + dmaddr = dst_maddr + + @property + def repr_rpc( + self, + ) -> str: + # TODO: how to show the transport interchange fmt? + # codec: str = self.chan.transport.codec_key + return ( + # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' + f'{self._nsf}() -> {self.repr_outcome()}:' + ) async def cancel( self, @@ -633,13 +860,23 @@ class Context: side: str = self.side self._cancel_called: bool = True - header: str = f'Cancelling "{side.upper()}"-side of ctx with peer\n' + header: str = ( + f'Cancelling ctx with peer from {side.upper()} side\n\n' + ) reminfo: str = ( - f'uid: {self.chan.uid}\n' - f' |_ {self._nsf}()\n' + # ' =>\n' + f'Context.cancel() => {self.chan.uid}\n' + # f'{self.chan.uid}\n' + f' |_ @{self.dst_maddr}\n' + f' >> {self.repr_rpc}\n' + # f' >> {self._nsf}() -> {codec}[dict]:\n\n' + # TODO: pull msg-type from spec re #320 ) - # caller side who entered `Portal.open_context()` + # CALLER side task + # ------ - ------ + # Aka the one that entered `Portal.open_context()` + # # NOTE: on the call side we never manually call # `._scope.cancel()` since we expect the eventual # `ContextCancelled` from the other side to trigger this @@ -648,8 +885,9 @@ class Context: # `Portal.open_context().__aexit__()`) if side == 'caller': if not self._portal: - raise RuntimeError( - "No portal found, this is likely a callee side context" + raise InternalError( + 'No portal found!?\n' + 'Why is this supposed caller context missing it?' ) cid: str = self.cid @@ -686,10 +924,18 @@ class Context: f'{reminfo}' ) - # callee side remote task - # NOTE: on this side we ALWAYS cancel the local scope since - # the caller expects a `ContextCancelled` to be sent from - # `._runtime._invoke()` back to the other side. + # CALLEE side task + # ------ - ------ + # Aka the one that DID NOT EVER enter a `Portal.open_context()` + # and instead was constructed and scheduled as an + # `_invoke()` RPC task. + # + # NOTE: on this side we ALWAYS cancel the local scope + # since the caller expects a `ContextCancelled` to be sent + # from `._runtime._invoke()` back to the other side. The + # logic for catching the result of the below + # `._scope.cancel()` is inside the `._runtime._invoke()` + # context RPC handling block. else: log.cancel( header @@ -750,7 +996,7 @@ class Context: # sent to the other side! if self._remote_error: # NOTE: this is diff then calling - # `._maybe_raise_from_remote_msg()` specifically + # `._maybe_raise_remote_err()` specifically # because any task entering this `.open_stream()` # AFTER cancellation has already been requested, # we DO NOT want to absorb any ctxc ACK silently! @@ -876,53 +1122,105 @@ class Context: f'ctx id: {self.cid}' ) + # TODO: replace all the instances of this!! XD + def maybe_raise( + self, + **kwargs, + ) -> Exception|None: + if re := self._remote_error: + return self._maybe_raise_remote_err( + re, + **kwargs, + ) + def _maybe_raise_remote_err( self, - err: Exception, + remote_error: Exception, raise_ctxc_from_self_call: bool = False, raise_overrun_from_self: bool = True, - ) -> ContextCancelled|None: + ) -> ( + ContextCancelled # `.cancel()` request to far side + |RemoteActorError # stream overrun caused and ignored by us + ): ''' - Maybe raise a remote error depending on who (which task from - which actor) requested a cancellation (if any). + Maybe raise a remote error depending on the type of error + and *who* (i.e. which task from which actor) requested + a cancellation (if any). ''' - # NOTE: whenever the context's "opener" side (task) **is** - # the side which requested the cancellation (likekly via - # ``Context.cancel()``), we don't want to re-raise that - # cancellation signal locally (would be akin to - # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` - # whenever ``CancelScope.cancel()`` was called) and - # instead silently reap the expected cancellation - # "error"-msg. - our_uid: tuple[str, str] = current_actor().uid - if ( - (not raise_ctxc_from_self_call - and isinstance(err, ContextCancelled) - and ( - self._cancel_called - or self.chan._cancel_called - or self.canceller == our_uid - or tuple(err.canceller) == our_uid) - ) - or - (not raise_overrun_from_self - and isinstance(err, RemoteActorError) - and err.msgdata['type_str'] == 'StreamOverrun' - and tuple(err.msgdata['sender']) == our_uid - ) + if (( + # NOTE: whenever the context's "opener" side (task) **is** + # the side which requested the cancellation (likekly via + # ``Context.cancel()``), we don't want to re-raise that + # cancellation signal locally (would be akin to + # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` + # whenever ``CancelScope.cancel()`` was called) and + # instead silently reap the expected cancellation + # "error"-msg-as-ack. In this case the `err: + # ContextCancelled` must have a `.canceller` set to the + # uid of the requesting task's actor and we only do NOT + # raise that error locally if WE ARE THAT ACTOR which + # requested the cancellation. + not raise_ctxc_from_self_call + and isinstance(remote_error, ContextCancelled) + and ( + self._cancel_called + # or self.chan._cancel_called + # TODO: ^ should we have a special separate case + # for this ^ ? + ) + and ( # one of, + + (portal := self._portal) + and (our_uid := portal.actor.uid) + # TODO: ?potentially it is useful to emit certain + # warning/cancel logs for the cases where the + # cancellation is due to a lower level cancel + # request, such as `Portal.cancel_actor()`, since in + # that case it's not actually this specific ctx that + # made a `.cancel()` call, but it is the same + # actor-process? + and tuple(remote_error.canceller) == our_uid + or self.chan._cancel_called + or self.canceller == our_uid + ) + ) or ( + + # NOTE: whenever this context is the cause of an + # overrun on the remote side (aka we sent msgs too + # fast that the remote task was overrun according + # to `MsgStream` buffer settings) AND the caller + # has requested to not raise overruns this side + # caused, we also silently absorb any remotely + # boxed `StreamOverrun`. This is mostly useful for + # supressing such faults during + # cancellation/error/final-result handling inside + # `_drain_to_final_msg()` such that we do not + # raise such errors particularly in the case where + # `._cancel_called == True`. + not raise_overrun_from_self + and isinstance(remote_error, RemoteActorError) + and remote_error.msgdata['type_str'] == 'StreamOverrun' + and tuple(remote_error.msgdata['sender']) == our_uid + ) ): # NOTE: we set the local scope error to any "self # cancellation" error-response thus "absorbing" # the error silently B) if self._local_error is None: - self._local_error = err + self._local_error = remote_error - return err + else: + log.warning( + 'Local error already set for ctx?\n' + f'{self._local_error}\n' + ) - # NOTE: currently we are masking underlying runtime errors + return remote_error + + # NOTE: currently we are hiding underlying runtime errors # which are often superfluous to user handler code. not # sure if this is still needed / desired for all operation? # TODO: maybe we can only NOT mask if: @@ -932,10 +1230,15 @@ class Context: # runtime frames from the tb explicitly? # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://stackoverflow.com/a/24752607 - # __tracebackhide__: bool = True - raise err from None + __tracebackhide__: bool = True + raise remote_error from None - async def result(self) -> Any | Exception: + # TODO: change to `.wait_for_result()`? + async def result( + self, + hide_tb: bool = True, + + ) -> Any|Exception: ''' From some (caller) side task, wait for and return the final result from the remote (callee) side's task. @@ -961,182 +1264,53 @@ class Context: of the remote cancellation. ''' - assert self._portal, "Context.result() can not be called from callee!" + __tracebackhide__ = hide_tb + assert self._portal, ( + "Context.result() can not be called from callee side!" + ) + if self._final_result_is_set(): + return self._result + assert self._recv_chan - raise_overrun: bool = not self._allow_overruns - # if re := self._remote_error: - # return self._maybe_raise_remote_err( - # re, - # # NOTE: obvi we don't care if we - # # overran the far end if we're already - # # waiting on a final result (msg). - # raise_overrun_from_self=raise_overrun, - # ) - - res_placeholder: int = id(self) + # res_placeholder: int = id(self) if ( - self._result == res_placeholder - and not self._remote_error + # self._result == res_placeholder + # and not self._remote_error + self.maybe_error is None + # not self._remote_error + # and not self._local_error and not self._recv_chan._closed # type: ignore ): - # wait for a final context result by collecting (but - # basically ignoring) any bi-dir-stream msgs still in transit - # from the far end. - drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self) - log.runtime( + # wait for a final context result/error by "draining" + # (by more or less ignoring) any bi-dir-stream "yield" + # msgs still in transit from the far end. + drained_msgs: list[dict] = await _drain_to_final_msg( + ctx=self, + hide_tb=hide_tb, + ) + for msg in drained_msgs: + + # TODO: mask this by default.. + if 'return' in msg: + # from .devx import pause + # await pause() + raise InternalError( + 'Final `return` msg should never be drained !?!?\n\n' + f'{msg}\n' + ) + + log.cancel( 'Ctx drained pre-result msgs:\n' f'{drained_msgs}' ) - # TODO: implement via helper func ^^^^ - # pre_result_drained: list[dict] = [] - # while not self._remote_error: - # try: - # # NOTE: this REPL usage actually works here dawg! Bo - # # from .devx._debug import pause - # # await pause() - # # if re := self._remote_error: - # # self._maybe_raise_remote_err( - # # re, - # # # NOTE: obvi we don't care if we - # # # overran the far end if we're already - # # # waiting on a final result (msg). - # # raise_overrun_from_self=raise_overrun, - # # ) - - # # TODO: bad idea? - # # with trio.CancelScope() as res_cs: - # # self._res_scope = res_cs - # # msg: dict = await self._recv_chan.receive() - # # if res_cs.cancelled_caught: - - # # from .devx._debug import pause - # # await pause() - # msg: dict = await self._recv_chan.receive() - # self._result: Any = msg['return'] - # log.runtime( - # 'Context delivered final result msg:\n' - # f'{pformat(msg)}' - # ) - # # NOTE: we don't need to do this right? - # # XXX: only close the rx mem chan AFTER - # # a final result is retreived. - # # if self._recv_chan: - # # await self._recv_chan.aclose() - # break - - # # NOTE: we get here if the far end was - # # `ContextCancelled` in 2 cases: - # # 1. we requested the cancellation and thus - # # SHOULD NOT raise that far end error, - # # 2. WE DID NOT REQUEST that cancel and thus - # # SHOULD RAISE HERE! - # except trio.Cancelled: - - # # CASE 2: mask the local cancelled-error(s) - # # only when we are sure the remote error is - # # the source cause of this local task's - # # cancellation. - # if re := self._remote_error: - # self._maybe_raise_remote_err(re) - - # # CASE 1: we DID request the cancel we simply - # # continue to bubble up as normal. - # raise - - # except KeyError: - - # if 'yield' in msg: - # # far end task is still streaming to us so discard - # log.warning(f'Discarding std "yield"\n{msg}') - # pre_result_drained.append(msg) - # continue - - # # TODO: work out edge cases here where - # # a stream is open but the task also calls - # # this? - # # -[ ] should be a runtime error if a stream is open - # # right? - # elif 'stop' in msg: - # log.cancel( - # 'Remote stream terminated due to "stop" msg:\n' - # f'{msg}' - # ) - # pre_result_drained.append(msg) - # continue - - # # internal error should never get here - # assert msg.get('cid'), ( - # "Received internal error at portal?" - # ) - - # # XXX fallthrough to handle expected error XXX - # re: Exception|None = self._remote_error - # if re: - # log.critical( - # 'Remote ctx terminated due to "error" msg:\n' - # f'{re}' - # ) - # assert msg is self._cancel_msg - # # NOTE: this solved a super dupe edge case XD - # # this was THE super duper edge case of: - # # - local task opens a remote task, - # # - requests remote cancellation of far end - # # ctx/tasks, - # # - needs to wait for the cancel ack msg - # # (ctxc) or some result in the race case - # # where the other side's task returns - # # before the cancel request msg is ever - # # rxed and processed, - # # - here this surrounding drain loop (which - # # iterates all ipc msgs until the ack or - # # an early result arrives) was NOT exiting - # # since we are the edge case: local task - # # does not re-raise any ctxc it receives - # # IFF **it** was the cancellation - # # requester.. - # # will raise if necessary, ow break from - # # loop presuming any error terminates the - # # context! - # self._maybe_raise_remote_err( - # re, - # # NOTE: obvi we don't care if we - # # overran the far end if we're already - # # waiting on a final result (msg). - # # raise_overrun_from_self=False, - # raise_overrun_from_self=raise_overrun, - # ) - - # break # OOOOOF, yeah obvi we need this.. - - # # XXX we should never really get here - # # right! since `._deliver_msg()` should - # # always have detected an {'error': ..} - # # msg and already called this right!?! - # elif error := unpack_error( - # msg=msg, - # chan=self._portal.channel, - # hide_tb=False, - # ): - # log.critical('SHOULD NEVER GET HERE!?') - # assert msg is self._cancel_msg - # assert error.msgdata == self._remote_error.msgdata - # from .devx._debug import pause - # await pause() - # self._maybe_cancel_and_set_remote_error(error) - # self._maybe_raise_remote_err(error) - - # else: - # # bubble the original src key error - # raise - if ( (re := self._remote_error) - and self._result == res_placeholder + # and self._result == res_placeholder ): - maybe_err: Exception|None = self._maybe_raise_remote_err( + self._maybe_raise_remote_err( re, # NOTE: obvi we don't care if we # overran the far end if we're already @@ -1152,10 +1326,126 @@ class Context: (not self._cancel_called) ), ) - if maybe_err: - self._result = maybe_err + # if maybe_err: + # self._result = maybe_err - return self._result + return self.outcome + # None if self._result == res_placeholder + # else self._result + # ) + + # TODO: switch this with above which should be named + # `.wait_for_outcome()` and instead do + # a `.outcome.Outcome.unwrap()` ? + # @property + # def result(self) -> Any|None: + # if self._final_result_is_set(): + # return self._result + + # raise RuntimeError('No result is available!') + + @property + def maybe_error(self) -> BaseException|None: + le: Exception|None = self._local_error + re: RemoteActorError|ContextCancelled|None = self._remote_error + + match (le, re): + # NOTE: remote errors always get precedence since even + # in the cases where a local error was the cause, the + # received boxed ctxc should include the src info + # caused by us right? + case ( + _, + RemoteActorError(), + ): + # give precedence to remote error if it's + # NOT a cancel ack (ctxc). + return ( + re or le + ) + + # TODO: extra logic to handle ctxc ack case(s)? + # -[ ] eg. we error, call .cancel(), rx ack but should + # raise the _local_error instead? + # -[ ] are there special error conditions where local vs. + # remote should take precedence? + # case ( + # _, + # ContextCancelled(canceller=), + # ): + + error: Exception|None = le or re + if error: + return error + + assert not self._cancel_msg + return None + + def _final_result_is_set(self) -> bool: + # return not (self._result == id(self)) + return self._result is not Unresolved + + # def get_result_nowait(self) -> Any|None: + # TODO: use `outcome.Outcome` here instead? + @property + def outcome(self) -> ( + Any| + RemoteActorError| + ContextCancelled + ): + ''' + The final "outcome" from an IPC context which can either be + some Value returned from the target `@context`-decorated + remote task-as-func, or an `Error` wrapping an exception + raised from an RPC task fault or cancellation. + + Note that if the remote task has not terminated then this + field always resolves to the module defined `Unresolved` handle. + + TODO: implement this using `outcome.Outcome` types? + + ''' + return self.maybe_error or self._result + + # @property + def repr_outcome( + self, + show_error_fields: bool = False, + + ) -> str: + ''' + Deliver a (simplified) `str` representation (as in + `.__repr__()`) of the final `.outcome` + + ''' + merr: Exception|None = self.maybe_error + if merr: + # if the error-type is one of ours and has the custom + # defined "repr-(in)-one-line" method call it, ow + # just deliver the type name. + if ( + (reprol := getattr(merr, 'reprol', False)) + and show_error_fields + ): + return reprol() + + elif isinstance(merr, BaseExceptionGroup): + # TODO: maybe for multis we should just show + # a one-line count per error type, like with + # `collections.Counter`? + # + # just the type name for now to avoid long lines + # when tons of cancels.. + return type(merr).__name__ + + # just the type name + # else: # but wen? + # return type(merr).__name__ + + # for all other errors show their regular output + return str(merr) + + return str(self._result) async def started( self, @@ -1261,8 +1551,14 @@ class Context: msg, self.chan, ): - log.error( - f'Delivering error-msg to caller\n' + if not isinstance(re, ContextCancelled): + log_meth = log.error + else: + log_meth = log.runtime + + log_meth( + f'Delivering error-msg to caller\n\n' + f'<= peer: {from_uid}\n' f' |_ {nsf}()\n\n' @@ -1276,7 +1572,7 @@ class Context: # NOTE: this will not raise an error, merely set # `._remote_error` and maybe cancel any task currently # entered in `Portal.open_context()` presuming the - # error is "cancel causing" (i.e. `ContextCancelled` + # error is "cancel causing" (i.e. a `ContextCancelled` # or `RemoteActorError`). self._maybe_cancel_and_set_remote_error(re) @@ -1288,6 +1584,10 @@ class Context: # return True # # XXX ALSO NO!! XXX + # => NEVER raise remote errors from the calling + # runtime task, they should always be raised by + # consumer side tasks operating on the + # `Portal`/`Context` APIs. # if self._remote_error: # self._maybe_raise_remote_err(error) @@ -1471,7 +1771,9 @@ def mk_context( _task=trio.lowlevel.current_task(), **kwargs, ) - ctx._result: int | Any = id(ctx) + # TODO: we can drop the old placeholder yah? + # ctx._result: int | Any = id(ctx) + ctx._result = Unresolved return ctx