forked from goodboy/tractor
				
			Get mega-pedantic in `Portal.open_context()`
Specifically in the `.__aexit__()` phase to ensure remote, runtime-internal, and locally raised error-during-cancelled-handling exceptions are NEVER masked by a local `ContextCancelled` or any exception group of `trio.Cancelled`s. Also adds a ton of details to doc strings including extreme detail surrounding the `ContextCancelled` raising cases and their processing inside `.open_context()`'s exception handler blocks. Details, details: - internal rename `err`/`_err` stuff to just be `scope_err` since it's effectively the error bubbled up from the context's surrounding (and cross-actor) "scope". - always shield `._recv_chan.aclose()` to avoid any `Cancelled` from masking the `scope_err` with a runtime related `trio.Cancelled`. - explicitly catch the specific set of `scope_err: BaseException` that we can reasonably expect to handle instead of the catch-all parent type including exception groups, cancels and KBIs.multihomed
							parent
							
								
									7eb31f3fea
								
							
						
					
					
						commit
						63b1488ab6
					
				| 
						 | 
				
			
			@ -15,8 +15,12 @@
 | 
			
		|||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
Memory boundary "Portals": an API for structured
 | 
			
		||||
concurrency linked tasks running in disparate memory domains.
 | 
			
		||||
Memory "portal" contruct.
 | 
			
		||||
 | 
			
		||||
"Memory portals" are both an API and set of IPC wrapping primitives
 | 
			
		||||
for managing structured concurrency "cancel-scope linked" tasks
 | 
			
		||||
running in disparate virtual memory domains - at least in different
 | 
			
		||||
OS processes, possibly on different (hardware) hosts.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
| 
						 | 
				
			
			@ -47,6 +51,7 @@ from ._exceptions import (
 | 
			
		|||
)
 | 
			
		||||
from ._context import Context
 | 
			
		||||
from ._streaming import MsgStream
 | 
			
		||||
from .devx._debug import maybe_wait_for_debugger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -66,20 +71,21 @@ def _unwrap_msg(
 | 
			
		|||
        raise unpack_error(msg, channel) from None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: maybe move this to ._exceptions?
 | 
			
		||||
class MessagingError(Exception):
 | 
			
		||||
    'Some kind of unexpected SC messaging dialog issue'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Portal:
 | 
			
		||||
    '''
 | 
			
		||||
    A 'portal' to a(n) (remote) ``Actor``.
 | 
			
		||||
    A 'portal' to a memory-domain-separated `Actor`.
 | 
			
		||||
 | 
			
		||||
    A portal is "opened" (and eventually closed) by one side of an
 | 
			
		||||
    inter-actor communication context. The side which opens the portal
 | 
			
		||||
    is equivalent to a "caller" in function parlance and usually is
 | 
			
		||||
    either the called actor's parent (in process tree hierarchy terms)
 | 
			
		||||
    or a client interested in scheduling work to be done remotely in a
 | 
			
		||||
    far process.
 | 
			
		||||
    process which has a separate (virtual) memory domain.
 | 
			
		||||
 | 
			
		||||
    The portal api allows the "caller" actor to invoke remote routines
 | 
			
		||||
    and receive results through an underlying ``tractor.Channel`` as
 | 
			
		||||
| 
						 | 
				
			
			@ -89,9 +95,9 @@ class Portal:
 | 
			
		|||
    like having a "portal" between the seperate actor memory spaces.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # the timeout for a remote cancel request sent to
 | 
			
		||||
    # a(n) (peer) actor.
 | 
			
		||||
    cancel_timeout = 0.5
 | 
			
		||||
    # global timeout for remote cancel requests sent to
 | 
			
		||||
    # connected (peer) actors.
 | 
			
		||||
    cancel_timeout: float = 0.5
 | 
			
		||||
 | 
			
		||||
    def __init__(self, channel: Channel) -> None:
 | 
			
		||||
        self.channel = channel
 | 
			
		||||
| 
						 | 
				
			
			@ -393,12 +399,32 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
    ) -> AsyncGenerator[tuple[Context, Any], None]:
 | 
			
		||||
        '''
 | 
			
		||||
        Open an inter-actor task context.
 | 
			
		||||
        Open an inter-actor "task context"; a remote task is
 | 
			
		||||
        scheduled and cancel-scope-state-linked to a `trio.run()` across
 | 
			
		||||
        memory boundaries in another actor's runtime.
 | 
			
		||||
 | 
			
		||||
        This is a synchronous API which allows for deterministic
 | 
			
		||||
        setup/teardown of a remote task. The yielded ``Context`` further
 | 
			
		||||
        allows for opening bidirectional streams, explicit cancellation
 | 
			
		||||
        and synchronized final result collection. See ``tractor.Context``.
 | 
			
		||||
        This is an `@acm` API which allows for deterministic setup
 | 
			
		||||
        and teardown of a remotely scheduled task in another remote
 | 
			
		||||
        actor. Once opened, the 2 now "linked" tasks run completely
 | 
			
		||||
        in parallel in each actor's runtime with their enclosing
 | 
			
		||||
        `trio.CancelScope`s kept in a synced state wherein if
 | 
			
		||||
        either side errors or cancels an equivalent error is
 | 
			
		||||
        relayed to the other side via an SC-compat IPC protocol.
 | 
			
		||||
 | 
			
		||||
        The yielded `tuple` is a pair delivering a `tractor.Context`
 | 
			
		||||
        and any first value "sent" by the "callee" task via a call
 | 
			
		||||
        to `Context.started(<value: Any>)`; this side of the
 | 
			
		||||
        context does not unblock until the "callee" task calls
 | 
			
		||||
        `.started()` in similar style to `trio.Nursery.start()`.
 | 
			
		||||
        When the "callee" (side that is "called"/started by a call
 | 
			
		||||
        to *this* method) returns, the caller side (this) unblocks
 | 
			
		||||
        and any final value delivered from the other end can be
 | 
			
		||||
        retrieved using the `Contex.result()` api.
 | 
			
		||||
 | 
			
		||||
        The yielded ``Context`` instance further allows for opening
 | 
			
		||||
        bidirectional streams, explicit cancellation and
 | 
			
		||||
        structurred-concurrency-synchronized final result-msg
 | 
			
		||||
        collection. See ``tractor.Context`` for more details.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        # conduct target func method structural checks
 | 
			
		||||
| 
						 | 
				
			
			@ -431,47 +457,52 @@ class Portal:
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
        assert ctx._remote_func_type == 'context'
 | 
			
		||||
        msg = await ctx._recv_chan.receive()
 | 
			
		||||
        msg: dict = await ctx._recv_chan.receive()
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # the "first" value here is delivered by the callee's
 | 
			
		||||
            # ``Context.started()`` call.
 | 
			
		||||
            first = msg['started']
 | 
			
		||||
            ctx._started_called = True
 | 
			
		||||
            ctx._started_called: bool = True
 | 
			
		||||
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            assert msg.get('cid'), ("Received internal error at context?")
 | 
			
		||||
            if not (cid := msg.get('cid')):
 | 
			
		||||
                raise MessagingError(
 | 
			
		||||
                    'Received internal error at context?\n'
 | 
			
		||||
                    'No call-id (cid) in startup msg?'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            if msg.get('error'):
 | 
			
		||||
                # raise kerr from unpack_error(msg, self.channel)
 | 
			
		||||
                # NOTE: mask the key error with the remote one
 | 
			
		||||
                raise unpack_error(msg, self.channel) from None
 | 
			
		||||
            else:
 | 
			
		||||
                raise MessagingError(
 | 
			
		||||
                    f'Context for {ctx.cid} was expecting a `started` message'
 | 
			
		||||
                    f' but received a non-error msg:\n{pformat(msg)}'
 | 
			
		||||
                    f'Context for {cid} was expecting a `started` message'
 | 
			
		||||
                    ' but received a non-error msg:\n'
 | 
			
		||||
                    f'{pformat(msg)}'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        _err: BaseException | None = None
 | 
			
		||||
        ctx._portal: Portal = self
 | 
			
		||||
 | 
			
		||||
        uid: tuple = self.channel.uid
 | 
			
		||||
        cid: str = ctx.cid
 | 
			
		||||
        etype: Type[BaseException] | None = None
 | 
			
		||||
 | 
			
		||||
        # deliver context instance and .started() msg value in enter
 | 
			
		||||
        # tuple.
 | 
			
		||||
        # placeholder for any exception raised in the runtime
 | 
			
		||||
        # or by user tasks which cause this context's closure.
 | 
			
		||||
        scope_err: BaseException | None = None
 | 
			
		||||
        try:
 | 
			
		||||
            async with trio.open_nursery() as nurse:
 | 
			
		||||
                ctx._scope_nursery = nurse
 | 
			
		||||
                ctx._scope = nurse.cancel_scope
 | 
			
		||||
                ctx._scope_nursery: trio.Nursery = nurse
 | 
			
		||||
                ctx._scope: trio.CancelScope = nurse.cancel_scope
 | 
			
		||||
 | 
			
		||||
                # deliver context instance and .started() msg value
 | 
			
		||||
                # in enter tuple.
 | 
			
		||||
                yield ctx, first
 | 
			
		||||
 | 
			
		||||
                # when in allow_ovveruns mode there may be lingering
 | 
			
		||||
                # overflow sender tasks remaining?
 | 
			
		||||
                # when in allow_overruns mode there may be
 | 
			
		||||
                # lingering overflow sender tasks remaining?
 | 
			
		||||
                if nurse.child_tasks:
 | 
			
		||||
                    # ensure we are in overrun state with
 | 
			
		||||
                    # ``._allow_overruns=True`` bc otherwise
 | 
			
		||||
                    # XXX: ensure we are in overrun state
 | 
			
		||||
                    # with ``._allow_overruns=True`` bc otherwise
 | 
			
		||||
                    # there should be no tasks in this nursery!
 | 
			
		||||
                    if (
 | 
			
		||||
                        not ctx._allow_overruns
 | 
			
		||||
| 
						 | 
				
			
			@ -479,47 +510,69 @@ class Portal:
 | 
			
		|||
                    ):
 | 
			
		||||
                        raise RuntimeError(
 | 
			
		||||
                            'Context has sub-tasks but is '
 | 
			
		||||
                            'not in `allow_overruns=True` Mode!?'
 | 
			
		||||
                            'not in `allow_overruns=True` mode!?'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    # ensure cancel of all overflow sender tasks
 | 
			
		||||
                    # started in the ctx nursery.
 | 
			
		||||
                    ctx._scope.cancel()
 | 
			
		||||
 | 
			
		||||
        except ContextCancelled as err:
 | 
			
		||||
            _err = err
 | 
			
		||||
        # XXX: (maybe) shield/mask context-cancellations that were
 | 
			
		||||
        # initiated by any of the context's 2 tasks. There are
 | 
			
		||||
        # subsequently 2 operating cases for a "graceful cancel"
 | 
			
		||||
        # of a `Context`:
 | 
			
		||||
        # 
 | 
			
		||||
        # 1.*this* side's task called `Context.cancel()`, in
 | 
			
		||||
        #   which case we mask the `ContextCancelled` from bubbling
 | 
			
		||||
        #   to the opener (much like how `trio.Nursery` swallows
 | 
			
		||||
        #   any `trio.Cancelled` bubbled by a call to
 | 
			
		||||
        #   `Nursery.cancel_scope.cancel()`)
 | 
			
		||||
        #
 | 
			
		||||
        # 2.*the other* side's (callee/spawned) task cancelled due
 | 
			
		||||
        #   to a self or peer cancellation request in which case we
 | 
			
		||||
        #   DO let the error bubble to the opener.
 | 
			
		||||
        except ContextCancelled as ctxc:
 | 
			
		||||
            scope_err = ctxc
 | 
			
		||||
 | 
			
		||||
            # swallow and mask cross-actor task context cancels that
 | 
			
		||||
            # were initiated by *this* side's task.
 | 
			
		||||
            # CASE 1: this context was never cancelled
 | 
			
		||||
            # via a local task's call to `Context.cancel()`.
 | 
			
		||||
            if not ctx._cancel_called:
 | 
			
		||||
                # XXX: this should NEVER happen!
 | 
			
		||||
                # from .devx._debug import breakpoint
 | 
			
		||||
                # await breakpoint()
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            # if the context was cancelled by client code
 | 
			
		||||
            # then we don't need to raise since user code
 | 
			
		||||
            # is expecting this and the block should exit.
 | 
			
		||||
            # CASE 2: context was cancelled by local task calling
 | 
			
		||||
            # `.cancel()`, we don't raise and the exit block should
 | 
			
		||||
            # exit silently.
 | 
			
		||||
            else:
 | 
			
		||||
                log.debug(f'Context {ctx} cancelled gracefully')
 | 
			
		||||
                log.debug(
 | 
			
		||||
                    f'Context {ctx} cancelled gracefully with:\n'
 | 
			
		||||
                    f'{ctxc}'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        except (
 | 
			
		||||
            BaseException,
 | 
			
		||||
            # - a standard error in the caller/yieldee
 | 
			
		||||
            Exception,
 | 
			
		||||
 | 
			
		||||
            # more specifically, we need to handle these but not
 | 
			
		||||
            # sure it's worth being pedantic:
 | 
			
		||||
            # Exception,
 | 
			
		||||
            # trio.Cancelled,
 | 
			
		||||
            # KeyboardInterrupt,
 | 
			
		||||
            # - a runtime teardown exception-group and/or
 | 
			
		||||
            #   cancellation request from a caller task.
 | 
			
		||||
            BaseExceptionGroup,
 | 
			
		||||
            trio.Cancelled,
 | 
			
		||||
            KeyboardInterrupt,
 | 
			
		||||
 | 
			
		||||
        ) as err:
 | 
			
		||||
            etype = type(err)
 | 
			
		||||
            scope_err = err
 | 
			
		||||
 | 
			
		||||
            # cancel ourselves on any error.
 | 
			
		||||
            # XXX: request cancel of this context on any error.
 | 
			
		||||
            # NOTE: `Context.cancel()` is conversely NOT called in
 | 
			
		||||
            # the `ContextCancelled` "cancellation requested" case
 | 
			
		||||
            # above.
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'Context cancelled for task, sending cancel request..\n'
 | 
			
		||||
                'Context cancelled for task due to\n'
 | 
			
		||||
                f'{err}\n'
 | 
			
		||||
                'Sending cancel request..\n'
 | 
			
		||||
                f'task:{cid}\n'
 | 
			
		||||
                f'actor:{uid}'
 | 
			
		||||
            )
 | 
			
		||||
            try:
 | 
			
		||||
 | 
			
		||||
                await ctx.cancel()
 | 
			
		||||
            except trio.BrokenResourceError:
 | 
			
		||||
                log.warning(
 | 
			
		||||
| 
						 | 
				
			
			@ -528,8 +581,9 @@ class Portal:
 | 
			
		|||
                    f'actor:{uid}'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            raise
 | 
			
		||||
            raise  # duh
 | 
			
		||||
 | 
			
		||||
        # no scope error case
 | 
			
		||||
        else:
 | 
			
		||||
            if ctx.chan.connected():
 | 
			
		||||
                log.info(
 | 
			
		||||
| 
						 | 
				
			
			@ -537,10 +591,20 @@ class Portal:
 | 
			
		|||
                    f'task: {cid}\n'
 | 
			
		||||
                    f'actor: {uid}'
 | 
			
		||||
                )
 | 
			
		||||
                # XXX NOTE XXX: the below call to
 | 
			
		||||
                # `Context.result()` will ALWAYS raise
 | 
			
		||||
                # a `ContextCancelled` (via an embedded call to
 | 
			
		||||
                # `Context._maybe_raise_remote_err()`) IFF
 | 
			
		||||
                # a `Context._remote_error` was set by the runtime
 | 
			
		||||
                # via a call to
 | 
			
		||||
                # `Context._maybe_cancel_and_set_remote_error()`
 | 
			
		||||
                # which IS SET any time the far end fails and
 | 
			
		||||
                # causes "caller side" cancellation via
 | 
			
		||||
                # a `ContextCancelled` here.
 | 
			
		||||
                result = await ctx.result()
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'Context {fn_name} returned '
 | 
			
		||||
                    f'value from callee `{result}`'
 | 
			
		||||
                    f'Context {fn_name} returned value from callee:\n'
 | 
			
		||||
                    f'`{result}`'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
| 
						 | 
				
			
			@ -548,22 +612,73 @@ class Portal:
 | 
			
		|||
            # operating *in* this scope to have survived
 | 
			
		||||
            # we tear down the runtime feeder chan last
 | 
			
		||||
            # to avoid premature stream clobbers.
 | 
			
		||||
            if ctx._recv_chan is not None:
 | 
			
		||||
                # should we encapsulate this in the context api?
 | 
			
		||||
            rxchan: trio.ReceiveChannel = ctx._recv_chan
 | 
			
		||||
            if (
 | 
			
		||||
                 rxchan
 | 
			
		||||
 | 
			
		||||
                # maybe TODO: yes i know the below check is
 | 
			
		||||
                # touching `trio` memchan internals..BUT, there are
 | 
			
		||||
                # only a couple ways to avoid a `trio.Cancelled`
 | 
			
		||||
                # bubbling from the `.aclose()` call below:
 | 
			
		||||
                #
 | 
			
		||||
                # - catch and mask it via the cancel-scope-shielded call
 | 
			
		||||
                #   as we are rn (manual and frowned upon) OR,
 | 
			
		||||
                # - specially handle the case where `scope_err` is
 | 
			
		||||
                #   one of {`BaseExceptionGroup`, `trio.Cancelled`}
 | 
			
		||||
                #   and then presume that the `.aclose()` call will
 | 
			
		||||
                #   raise a `trio.Cancelled` and just don't call it
 | 
			
		||||
                #   in those cases..
 | 
			
		||||
                #
 | 
			
		||||
                # that latter approach is more logic, LOC, and more
 | 
			
		||||
                # convoluted so for now stick with the first
 | 
			
		||||
                # psuedo-hack-workaround where we just try to avoid
 | 
			
		||||
                # the shielded call as much as we can detect from
 | 
			
		||||
                # the memchan's `._closed` state..
 | 
			
		||||
                #
 | 
			
		||||
                # XXX MOTIVATION XXX-> we generally want to raise
 | 
			
		||||
                # any underlying actor-runtime/internals error that
 | 
			
		||||
                # surfaces from a bug in tractor itself so it can
 | 
			
		||||
                # be easily detected/fixed AND, we also want to
 | 
			
		||||
                # minimize noisy runtime tracebacks (normally due
 | 
			
		||||
                # to the cross-actor linked task scope machinery
 | 
			
		||||
                # teardown) displayed to user-code and instead only
 | 
			
		||||
                # displaying `ContextCancelled` traces where the
 | 
			
		||||
                # cause of crash/exit IS due to something in
 | 
			
		||||
                # user/app code on either end of the context.
 | 
			
		||||
                and not rxchan._closed
 | 
			
		||||
            ):
 | 
			
		||||
                # XXX NOTE XXX: and again as per above, we mask any
 | 
			
		||||
                # `trio.Cancelled` raised here so as to NOT mask
 | 
			
		||||
                # out any exception group or legit (remote) ctx
 | 
			
		||||
                # error that sourced from the remote task or its
 | 
			
		||||
                # runtime.
 | 
			
		||||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await ctx._recv_chan.aclose()
 | 
			
		||||
 | 
			
		||||
            if etype:
 | 
			
		||||
            # XXX: since we always (maybe) re-raise (and thus also
 | 
			
		||||
            # mask runtime machinery related
 | 
			
		||||
            # multi-`trio.Cancelled`s) any scope error which was
 | 
			
		||||
            # the underlying cause of this context's exit, add
 | 
			
		||||
            # different log msgs for each of the (2) cases.
 | 
			
		||||
            if scope_err is not None:
 | 
			
		||||
                etype: Type[BaseException] = type(scope_err)
 | 
			
		||||
 | 
			
		||||
                # CASE 2
 | 
			
		||||
                if ctx._cancel_called:
 | 
			
		||||
                    log.cancel(
 | 
			
		||||
                        f'Context {fn_name} cancelled by caller with\n{etype}'
 | 
			
		||||
                        f'Context {fn_name} cancelled by caller with\n'
 | 
			
		||||
                        f'{etype}'
 | 
			
		||||
                    )
 | 
			
		||||
                elif _err is not None:
 | 
			
		||||
 | 
			
		||||
                # CASE 1
 | 
			
		||||
                else:
 | 
			
		||||
                    log.cancel(
 | 
			
		||||
                        f'Context for task cancelled by callee with {etype}\n'
 | 
			
		||||
                        f'Context cancelled by callee with {etype}\n'
 | 
			
		||||
                        f'target: `{fn_name}`\n'
 | 
			
		||||
                        f'task:{cid}\n'
 | 
			
		||||
                        f'actor:{uid}'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            # XXX: (MEGA IMPORTANT) if this is a root opened process we
 | 
			
		||||
            # wait for any immediate child in debug before popping the
 | 
			
		||||
            # context from the runtime msg loop otherwise inside
 | 
			
		||||
| 
						 | 
				
			
			@ -572,10 +687,10 @@ class Portal:
 | 
			
		|||
            # a "stop" msg for a stream), this can result in a deadlock
 | 
			
		||||
            # where the root is waiting on the lock to clear but the
 | 
			
		||||
            # child has already cleared it and clobbered IPC.
 | 
			
		||||
            from .devx._debug import maybe_wait_for_debugger
 | 
			
		||||
            await maybe_wait_for_debugger()
 | 
			
		||||
 | 
			
		||||
            # remove the context from runtime tracking
 | 
			
		||||
            # FINALLY, remove the context from runtime tracking and
 | 
			
		||||
            # exit Bo
 | 
			
		||||
            self.actor._contexts.pop(
 | 
			
		||||
                (self.channel.uid, ctx.cid),
 | 
			
		||||
                None,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue