diff --git a/tractor/_portal.py b/tractor/_portal.py index a4f2f61..04f613e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -27,8 +27,9 @@ from __future__ import annotations import importlib import inspect from typing import ( - Any, Optional, - Callable, AsyncGenerator, + Any, + Callable, + AsyncGenerator, Type, ) from functools import partial @@ -52,6 +53,7 @@ from ._ipc import Channel from .log import get_logger from .msg import NamespacePath from ._exceptions import ( + InternalError, _raise_from_no_key_in_msg, unpack_error, NoResult, @@ -126,7 +128,7 @@ class Portal: def __init__(self, channel: Channel) -> None: self.chan = channel # during the portal's lifetime - self._result_msg: Optional[dict] = None + self._result_msg: dict|None = None # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some @@ -171,7 +173,7 @@ class Portal: ) -> dict[str, Any]: assert ctx._remote_func_type == 'asyncfunc' # single response - msg = await ctx._recv_chan.receive() + msg: dict = await ctx._recv_chan.receive() return msg async def result(self) -> Any: @@ -255,11 +257,11 @@ class Portal: return False reminfo: str = ( - f'uid: {self.channel.uid}\n' - f' |_{chan}\n' + f'{self.channel.uid}\n' + f' |_{chan}\n' ) log.cancel( - f'Sending actor cancel request to peer\n' + f'Sending runtime `.cancel()` request to peer\n\n' f'{reminfo}' ) @@ -281,7 +283,9 @@ class Portal: return True if cs.cancelled_caught: - log.cancel( + # may timeout and we never get an ack (obvi racy) + # but that doesn't mean it wasn't cancelled. + log.debug( 'May have failed to cancel peer?\n' f'{reminfo}' ) @@ -293,9 +297,11 @@ class Portal: trio.ClosedResourceError, trio.BrokenResourceError, ): - log.cancel( - f"{self.channel} for {self.channel.uid} was already " - "closed or broken?") + log.debug( + 'IPC chan for actor already closed or broken?\n\n' + f'{self.channel.uid}\n' + f' |_{self.channel}\n' + ) return False async def run_from_ns( @@ -416,7 +422,8 @@ class Portal: try: # deliver receive only stream async with MsgStream( - ctx, ctx._recv_chan, + ctx=ctx, + rx_chan=ctx._recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -443,6 +450,11 @@ class Portal: # await recv_chan.aclose() self._streams.remove(rchan) + # TODO: move this impl to `._context` mod and + # instead just bind it here as a method so that the logic + # for ctx stuff stays all in one place (instead of frickin + # having to open this file in tandem every gd time!!! XD) + # @asynccontextmanager async def open_context( @@ -451,6 +463,11 @@ class Portal: allow_overruns: bool = False, + # TODO: if we set this the wrapping `@acm` body will + # still be shown (awkwardly) on pdb REPL entry. Ideally + # we can similarly annotate that frame to NOT show? + hide_tb: bool = False, + # proxied to RPC **kwargs, @@ -484,6 +501,8 @@ class Portal: collection. See ``tractor.Context`` for more details. ''' + __tracebackhide__: bool = hide_tb + # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -536,9 +555,12 @@ class Portal: # placeholder for any exception raised in the runtime # or by user tasks which cause this context's closure. - scope_err: BaseException | None = None + scope_err: BaseException|None = None + ctxc_from_callee: ContextCancelled|None = None try: async with trio.open_nursery() as nurse: + + # NOTE: used to start overrun queuing tasks ctx._scope_nursery: trio.Nursery = nurse ctx._scope: trio.CancelScope = nurse.cancel_scope @@ -546,14 +568,26 @@ class Portal: # in enter tuple. yield ctx, first - # between the caller exiting and arriving here the - # far end may have sent a ctxc-msg or other error, - # so check for it here immediately and maybe raise - # so as to engage the ctxc handling block below! + # ??TODO??: do we still want to consider this or is + # the `else:` block handling via a `.result()` + # call below enough?? + # -[ ] pretty sure `.result()` internals do the + # same as our ctxc handler below so it ended up + # being same (repeated?) behaviour, but ideally we + # wouldn't have that duplication either by somehow + # factoring the `.result()` handler impl in a way + # that we can re-use it around the `yield` ^ here + # or vice versa? + # + # NOTE: between the caller exiting and arriving + # here the far end may have sent a ctxc-msg or + # other error, so check for it here immediately + # and maybe raise so as to engage the ctxc + # handling block below! + # # if re := ctx._remote_error: # maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( # re, - # # TODO: do we want this to always raise? # # - means that on self-ctxc, if/when the # # block is exited before the msg arrives @@ -571,7 +605,7 @@ class Portal: # # block? # raise_ctxc_from_self_call=True, # ) - # assert maybe_ctxc + # ctxc_from_callee = maybe_ctxc # when in allow_overruns mode there may be # lingering overflow sender tasks remaining? @@ -583,13 +617,18 @@ class Portal: not ctx._allow_overruns or len(nurse.child_tasks) > 1 ): - raise RuntimeError( + raise InternalError( 'Context has sub-tasks but is ' 'not in `allow_overruns=True` mode!?' ) - # ensure cancel of all overflow sender tasks - # started in the ctx nursery. + # ensure we cancel all overflow sender + # tasks started in the nursery when + # `._allow_overruns == True`. + # + # NOTE: this means `._scope.cancelled_caught` + # will prolly be set! not sure if that's + # non-ideal or not ??? ctx._scope.cancel() # XXX NOTE XXX: maybe shield against @@ -602,14 +641,15 @@ class Portal: # of a `Context`. In both cases any `ContextCancelled` # raised in this scope-block came from a transport msg # relayed from some remote-actor-task which our runtime set - # as a `Context._remote_error` + # as to `Context._remote_error` # # the CASES: # # - if that context IS THE SAME ONE that called # `Context.cancel()`, we want to absorb the error # silently and let this `.open_context()` block to exit - # without raising. + # without raising, ideally eventually receiving the ctxc + # ack msg thus resulting in `ctx.cancel_acked == True`. # # - if it is from some OTHER context (we did NOT call # `.cancel()`), we want to re-RAISE IT whilst also @@ -633,6 +673,7 @@ class Portal: # `Nursery.cancel_scope.cancel()`) except ContextCancelled as ctxc: scope_err = ctxc + ctxc_from_callee = ctxc # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # using this code and then resuming the REPL will @@ -642,6 +683,7 @@ class Portal: # documenting it as a definittive example of # debugging the tractor-runtime itself using it's # own `.devx.` tooling! + # # await pause() # CASE 2: context was cancelled by local task calling @@ -649,15 +691,10 @@ class Portal: # exit silently. if ( ctx._cancel_called - and ( - ctxc is ctx._remote_error - # ctxc.msgdata == ctx._remote_error.msgdata - - # TODO: uhh `Portal.canceller` ain't a thangg - # dawg? (was `self.canceller` before?!?) - and - ctxc.canceller == self.actor.uid - ) + and + ctxc is ctx._remote_error + and + ctxc.canceller == self.actor.uid ): log.cancel( f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' @@ -665,9 +702,9 @@ class Portal: ) # CASE 1: this context was never cancelled via a local # task (tree) having called `Context.cancel()`, raise - # the error since it was caused by someone else! + # the error since it was caused by someone else + # -> probably a remote peer! else: - # await pause() raise # the above `._scope` can be cancelled due to: @@ -680,19 +717,29 @@ class Portal: # CASE 3: standard local error in this caller/yieldee Exception, - # CASES 1 & 2: normally manifested as - # a `Context._scope_nursery` raised + # CASES 1 & 2: can manifest as a `ctx._scope_nursery` # exception-group of, + # # 1.-`trio.Cancelled`s, since - # `._scope.cancel()` will have been called and any - # `ContextCancelled` absorbed and thus NOT RAISED in - # any `Context._maybe_raise_remote_err()`, + # `._scope.cancel()` will have been called + # (transitively by the runtime calling + # `._deliver_msg()`) and any `ContextCancelled` + # eventually absorbed and thus absorbed/supressed in + # any `Context._maybe_raise_remote_err()` call. + # # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` - # from any error raised in the "callee" side with - # a group only raised if there was any more then one - # task started here in the "caller" in the - # `yield`-ed to task. - BaseExceptionGroup, # since overrun handler tasks may have been spawned + # from any error delivered from the "callee" side + # AND a group-exc is only raised if there was > 1 + # tasks started *here* in the "caller" / opener + # block. If any one of those tasks calls + # `.result()` or `MsgStream.receive()` + # `._maybe_raise_remote_err()` will be transitively + # called and the remote error raised causing all + # tasks to be cancelled. + # NOTE: ^ this case always can happen if any + # overrun handler tasks were spawned! + BaseExceptionGroup, + trio.Cancelled, # NOTE: NOT from inside the ctx._scope KeyboardInterrupt, @@ -702,69 +749,48 @@ class Portal: # XXX: ALWAYS request the context to CANCEL ON any ERROR. # NOTE: `Context.cancel()` is conversely NEVER CALLED in # the `ContextCancelled` "self cancellation absorbed" case - # handled in the block above! + # handled in the block above ^^^ !! log.cancel( - 'Context cancelled for task due to\n' + 'Context terminated due to\n\n' f'{caller_err}\n' - 'Sending cancel request..\n' - f'task:{cid}\n' - f'actor:{uid}' ) if debug_mode(): - log.pdb( - 'Delaying `ctx.cancel()` until debug lock ' - 'acquired..' - ) # async with acquire_debug_lock(self.actor.uid): # pass # TODO: factor ^ into below for non-root cases? - await maybe_wait_for_debugger() - log.pdb( - 'Acquired debug lock! ' - 'Calling `ctx.cancel()`!' + was_acquired: bool = await maybe_wait_for_debugger( + header_msg=( + 'Delaying `ctx.cancel()` until debug lock ' + 'acquired..\n' + ), ) + if was_acquired: + log.pdb( + 'Acquired debug lock! ' + 'Calling `ctx.cancel()`!\n' + ) - try: - await ctx.cancel() - except trio.BrokenResourceError: - log.warning( - 'IPC connection for context is broken?\n' - f'task:{cid}\n' - f'actor:{uid}' - ) + + # we don't need to cancel the callee if it already + # told us it's cancelled ;p + if ctxc_from_callee is None: + try: + await ctx.cancel() + except ( + trio.BrokenResourceError, + trio.ClosedResourceError, + ): + log.warning( + 'IPC connection for context is broken?\n' + f'task:{cid}\n' + f'actor:{uid}' + ) raise # duh # no local scope error, the "clean exit with a result" case. else: - # between the caller exiting and arriving here the - # far end may have sent a ctxc-msg or other error, - # so check for it here immediately and maybe raise - # so as to engage the ctxc handling block below! - # if re := ctx._remote_error: - # maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( - # re, - - # # TODO: do we want this to always raise? - # # - means that on self-ctxc, if/when the - # # block is exited before the msg arrives - # # but then the msg during __exit__ - # # calling we may not activate the - # # ctxc-handler block below? should we - # # be? - # # - if there's a remote error that arrives - # # after the child has exited, we won't - # # handle until the `finally:` block - # # where `.result()` is always called, - # # again in which case we handle it - # # differently then in the handler block - # # that would normally engage from THIS - # # block? - # raise_ctxc_from_self_call=True, - # ) - # assert maybe_ctxc - if ctx.chan.connected(): log.runtime( 'Waiting on final context result for\n' @@ -794,16 +820,18 @@ class Portal: scope_err = berr raise + # yes! this worx Bp + # from .devx import _debug + # await _debug.pause() + # an exception type boxed in a `RemoteActorError` - # is returned (meaning it was obvi not raised). + # is returned (meaning it was obvi not raised) + # that we want to log-report on. msgdata: str|None = getattr( result_or_err, 'msgdata', None ) - # yes! this worx Bp - # from .devx import _debug - # await _debug.pause() match (msgdata, result_or_err): case ( {'tb_str': tbstr}, @@ -846,9 +874,8 @@ class Portal: # operating *in* this scope to have survived # we tear down the runtime feeder chan last # to avoid premature stream clobbers. - rxchan: trio.ReceiveChannel = ctx._recv_chan if ( - rxchan + (rxchan := ctx._recv_chan) # maybe TODO: yes i know the below check is # touching `trio` memchan internals..BUT, there are @@ -904,22 +931,27 @@ class Portal: etype: Type[BaseException] = type(scope_err) # CASE 2 - if ctx._cancel_called: + if ( + ctx._cancel_called + and ctx.cancel_acked + ): log.cancel( 'Context cancelled by caller task\n' f'|_{ctx._task}\n\n' - f'{etype}' + f'{repr(scope_err)}\n' ) + # TODO: should we add a `._cancel_req_received` + # flag to determine if the callee manually called + # `ctx.cancel()`? + # -[ ] going to need a cid check no? + # CASE 1 else: log.cancel( - f'Context cancelled by remote callee task\n' - f'peer: {uid}\n' - f'|_ {nsf}()\n\n' - - f'{etype}\n' + f'Context terminated due to local scope error:\n' + f'{etype.__name__}\n' ) # FINALLY, remove the context from runtime tracking and @@ -967,7 +999,7 @@ class LocalPortal: async def open_portal( channel: Channel, - nursery: Optional[trio.Nursery] = None, + nursery: trio.Nursery|None = None, start_msg_loop: bool = True, shield: bool = False, @@ -992,7 +1024,7 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs: Optional[trio.CancelScope] = None + msg_loop_cs: trio.CancelScope|None = None if start_msg_loop: from ._runtime import process_messages msg_loop_cs = await nursery.start(