diff --git a/tractor/_portal.py b/tractor/_portal.py index cf13d9b..d53fc6b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -15,8 +15,12 @@ # along with this program. If not, see . ''' -Memory boundary "Portals": an API for structured -concurrency linked tasks running in disparate memory domains. +Memory "portal" contruct. + +"Memory portals" are both an API and set of IPC wrapping primitives +for managing structured concurrency "cancel-scope linked" tasks +running in disparate virtual memory domains - at least in different +OS processes, possibly on different (hardware) hosts. ''' from __future__ import annotations @@ -47,6 +51,7 @@ from ._exceptions import ( ) from ._context import Context from ._streaming import MsgStream +from .devx._debug import maybe_wait_for_debugger log = get_logger(__name__) @@ -66,20 +71,21 @@ def _unwrap_msg( raise unpack_error(msg, channel) from None +# TODO: maybe move this to ._exceptions? class MessagingError(Exception): 'Some kind of unexpected SC messaging dialog issue' class Portal: ''' - A 'portal' to a(n) (remote) ``Actor``. + A 'portal' to a memory-domain-separated `Actor`. A portal is "opened" (and eventually closed) by one side of an inter-actor communication context. The side which opens the portal is equivalent to a "caller" in function parlance and usually is either the called actor's parent (in process tree hierarchy terms) or a client interested in scheduling work to be done remotely in a - far process. + process which has a separate (virtual) memory domain. The portal api allows the "caller" actor to invoke remote routines and receive results through an underlying ``tractor.Channel`` as @@ -89,9 +95,9 @@ class Portal: like having a "portal" between the seperate actor memory spaces. ''' - # the timeout for a remote cancel request sent to - # a(n) (peer) actor. - cancel_timeout = 0.5 + # global timeout for remote cancel requests sent to + # connected (peer) actors. + cancel_timeout: float = 0.5 def __init__(self, channel: Channel) -> None: self.channel = channel @@ -393,12 +399,32 @@ class Portal: ) -> AsyncGenerator[tuple[Context, Any], None]: ''' - Open an inter-actor task context. + Open an inter-actor "task context"; a remote task is + scheduled and cancel-scope-state-linked to a `trio.run()` across + memory boundaries in another actor's runtime. - This is a synchronous API which allows for deterministic - setup/teardown of a remote task. The yielded ``Context`` further - allows for opening bidirectional streams, explicit cancellation - and synchronized final result collection. See ``tractor.Context``. + This is an `@acm` API which allows for deterministic setup + and teardown of a remotely scheduled task in another remote + actor. Once opened, the 2 now "linked" tasks run completely + in parallel in each actor's runtime with their enclosing + `trio.CancelScope`s kept in a synced state wherein if + either side errors or cancels an equivalent error is + relayed to the other side via an SC-compat IPC protocol. + + The yielded `tuple` is a pair delivering a `tractor.Context` + and any first value "sent" by the "callee" task via a call + to `Context.started()`; this side of the + context does not unblock until the "callee" task calls + `.started()` in similar style to `trio.Nursery.start()`. + When the "callee" (side that is "called"/started by a call + to *this* method) returns, the caller side (this) unblocks + and any final value delivered from the other end can be + retrieved using the `Contex.result()` api. + + The yielded ``Context`` instance further allows for opening + bidirectional streams, explicit cancellation and + structurred-concurrency-synchronized final result-msg + collection. See ``tractor.Context`` for more details. ''' # conduct target func method structural checks @@ -431,47 +457,52 @@ class Portal: ) assert ctx._remote_func_type == 'context' - msg = await ctx._recv_chan.receive() + msg: dict = await ctx._recv_chan.receive() try: # the "first" value here is delivered by the callee's # ``Context.started()`` call. first = msg['started'] - ctx._started_called = True + ctx._started_called: bool = True except KeyError: - assert msg.get('cid'), ("Received internal error at context?") + if not (cid := msg.get('cid')): + raise MessagingError( + 'Received internal error at context?\n' + 'No call-id (cid) in startup msg?' + ) if msg.get('error'): - # raise kerr from unpack_error(msg, self.channel) + # NOTE: mask the key error with the remote one raise unpack_error(msg, self.channel) from None else: raise MessagingError( - f'Context for {ctx.cid} was expecting a `started` message' - f' but received a non-error msg:\n{pformat(msg)}' + f'Context for {cid} was expecting a `started` message' + ' but received a non-error msg:\n' + f'{pformat(msg)}' ) - _err: BaseException | None = None ctx._portal: Portal = self - uid: tuple = self.channel.uid cid: str = ctx.cid - etype: Type[BaseException] | None = None - # deliver context instance and .started() msg value in enter - # tuple. + # placeholder for any exception raised in the runtime + # or by user tasks which cause this context's closure. + scope_err: BaseException | None = None try: async with trio.open_nursery() as nurse: - ctx._scope_nursery = nurse - ctx._scope = nurse.cancel_scope + ctx._scope_nursery: trio.Nursery = nurse + ctx._scope: trio.CancelScope = nurse.cancel_scope + # deliver context instance and .started() msg value + # in enter tuple. yield ctx, first - # when in allow_ovveruns mode there may be lingering - # overflow sender tasks remaining? + # when in allow_overruns mode there may be + # lingering overflow sender tasks remaining? if nurse.child_tasks: - # ensure we are in overrun state with - # ``._allow_overruns=True`` bc otherwise + # XXX: ensure we are in overrun state + # with ``._allow_overruns=True`` bc otherwise # there should be no tasks in this nursery! if ( not ctx._allow_overruns @@ -479,47 +510,69 @@ class Portal: ): raise RuntimeError( 'Context has sub-tasks but is ' - 'not in `allow_overruns=True` Mode!?' + 'not in `allow_overruns=True` mode!?' ) + + # ensure cancel of all overflow sender tasks + # started in the ctx nursery. ctx._scope.cancel() - except ContextCancelled as err: - _err = err + # XXX: (maybe) shield/mask context-cancellations that were + # initiated by any of the context's 2 tasks. There are + # subsequently 2 operating cases for a "graceful cancel" + # of a `Context`: + # + # 1.*this* side's task called `Context.cancel()`, in + # which case we mask the `ContextCancelled` from bubbling + # to the opener (much like how `trio.Nursery` swallows + # any `trio.Cancelled` bubbled by a call to + # `Nursery.cancel_scope.cancel()`) + # + # 2.*the other* side's (callee/spawned) task cancelled due + # to a self or peer cancellation request in which case we + # DO let the error bubble to the opener. + except ContextCancelled as ctxc: + scope_err = ctxc - # swallow and mask cross-actor task context cancels that - # were initiated by *this* side's task. + # CASE 1: this context was never cancelled + # via a local task's call to `Context.cancel()`. if not ctx._cancel_called: - # XXX: this should NEVER happen! - # from .devx._debug import breakpoint - # await breakpoint() raise - # if the context was cancelled by client code - # then we don't need to raise since user code - # is expecting this and the block should exit. + # CASE 2: context was cancelled by local task calling + # `.cancel()`, we don't raise and the exit block should + # exit silently. else: - log.debug(f'Context {ctx} cancelled gracefully') + log.debug( + f'Context {ctx} cancelled gracefully with:\n' + f'{ctxc}' + ) except ( - BaseException, + # - a standard error in the caller/yieldee + Exception, - # more specifically, we need to handle these but not - # sure it's worth being pedantic: - # Exception, - # trio.Cancelled, - # KeyboardInterrupt, + # - a runtime teardown exception-group and/or + # cancellation request from a caller task. + BaseExceptionGroup, + trio.Cancelled, + KeyboardInterrupt, ) as err: - etype = type(err) + scope_err = err - # cancel ourselves on any error. + # XXX: request cancel of this context on any error. + # NOTE: `Context.cancel()` is conversely NOT called in + # the `ContextCancelled` "cancellation requested" case + # above. log.cancel( - 'Context cancelled for task, sending cancel request..\n' + 'Context cancelled for task due to\n' + f'{err}\n' + 'Sending cancel request..\n' f'task:{cid}\n' f'actor:{uid}' ) try: - await ctx.cancel() except trio.BrokenResourceError: log.warning( @@ -528,8 +581,9 @@ class Portal: f'actor:{uid}' ) - raise + raise # duh + # no scope error case else: if ctx.chan.connected(): log.info( @@ -537,10 +591,20 @@ class Portal: f'task: {cid}\n' f'actor: {uid}' ) + # XXX NOTE XXX: the below call to + # `Context.result()` will ALWAYS raise + # a `ContextCancelled` (via an embedded call to + # `Context._maybe_raise_remote_err()`) IFF + # a `Context._remote_error` was set by the runtime + # via a call to + # `Context._maybe_cancel_and_set_remote_error()` + # which IS SET any time the far end fails and + # causes "caller side" cancellation via + # a `ContextCancelled` here. result = await ctx.result() log.runtime( - f'Context {fn_name} returned ' - f'value from callee `{result}`' + f'Context {fn_name} returned value from callee:\n' + f'`{result}`' ) finally: @@ -548,22 +612,73 @@ class Portal: # operating *in* this scope to have survived # we tear down the runtime feeder chan last # to avoid premature stream clobbers. - if ctx._recv_chan is not None: - # should we encapsulate this in the context api? - await ctx._recv_chan.aclose() + rxchan: trio.ReceiveChannel = ctx._recv_chan + if ( + rxchan - if etype: + # maybe TODO: yes i know the below check is + # touching `trio` memchan internals..BUT, there are + # only a couple ways to avoid a `trio.Cancelled` + # bubbling from the `.aclose()` call below: + # + # - catch and mask it via the cancel-scope-shielded call + # as we are rn (manual and frowned upon) OR, + # - specially handle the case where `scope_err` is + # one of {`BaseExceptionGroup`, `trio.Cancelled`} + # and then presume that the `.aclose()` call will + # raise a `trio.Cancelled` and just don't call it + # in those cases.. + # + # that latter approach is more logic, LOC, and more + # convoluted so for now stick with the first + # psuedo-hack-workaround where we just try to avoid + # the shielded call as much as we can detect from + # the memchan's `._closed` state.. + # + # XXX MOTIVATION XXX-> we generally want to raise + # any underlying actor-runtime/internals error that + # surfaces from a bug in tractor itself so it can + # be easily detected/fixed AND, we also want to + # minimize noisy runtime tracebacks (normally due + # to the cross-actor linked task scope machinery + # teardown) displayed to user-code and instead only + # displaying `ContextCancelled` traces where the + # cause of crash/exit IS due to something in + # user/app code on either end of the context. + and not rxchan._closed + ): + # XXX NOTE XXX: and again as per above, we mask any + # `trio.Cancelled` raised here so as to NOT mask + # out any exception group or legit (remote) ctx + # error that sourced from the remote task or its + # runtime. + with trio.CancelScope(shield=True): + await ctx._recv_chan.aclose() + + # XXX: since we always (maybe) re-raise (and thus also + # mask runtime machinery related + # multi-`trio.Cancelled`s) any scope error which was + # the underlying cause of this context's exit, add + # different log msgs for each of the (2) cases. + if scope_err is not None: + etype: Type[BaseException] = type(scope_err) + + # CASE 2 if ctx._cancel_called: log.cancel( - f'Context {fn_name} cancelled by caller with\n{etype}' + f'Context {fn_name} cancelled by caller with\n' + f'{etype}' ) - elif _err is not None: + + # CASE 1 + else: log.cancel( - f'Context for task cancelled by callee with {etype}\n' + f'Context cancelled by callee with {etype}\n' f'target: `{fn_name}`\n' f'task:{cid}\n' f'actor:{uid}' ) + # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the # context from the runtime msg loop otherwise inside @@ -572,10 +687,10 @@ class Portal: # a "stop" msg for a stream), this can result in a deadlock # where the root is waiting on the lock to clear but the # child has already cleared it and clobbered IPC. - from .devx._debug import maybe_wait_for_debugger await maybe_wait_for_debugger() - # remove the context from runtime tracking + # FINALLY, remove the context from runtime tracking and + # exit Bo self.actor._contexts.pop( (self.channel.uid, ctx.cid), None,