forked from goodboy/tractor
				
			Move `Portal.open_context()` impl to `._context`
Finally, since normally you need the content from `._context.Context` and surroundings in order to effectively grok `Portal.open_context()` anyways, might as well move the impl to the ctx module as `open_context_from_portal()` and just bind it on the `Portal` class def. Associated/required tweaks: - avoid circ import on `.devx` by only import `.maybe_wait_for_debugger()` when debug mode is set. - drop `async_generator` usage, not sure why this hadn't already been changed to `contextlib`? - use `@acm` alias throughout `._portal`remotes/1757153874605917753/main
							parent
							
								
									544cb40533
								
							
						
					
					
						commit
						4f7823cf55
					
				|  | @ -43,7 +43,6 @@ import warnings | |||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| from .msg import NamespacePath | ||||
| from ._exceptions import ( | ||||
|     ContextCancelled, | ||||
|     InternalError, | ||||
|  | @ -51,11 +50,16 @@ from ._exceptions import ( | |||
|     StreamOverrun, | ||||
|     pack_error, | ||||
|     unpack_error, | ||||
|     _raise_from_no_key_in_msg, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from .msg import NamespacePath | ||||
| from ._ipc import Channel | ||||
| from ._streaming import MsgStream | ||||
| from ._state import current_actor | ||||
| from ._state import ( | ||||
|     current_actor, | ||||
|     debug_mode, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._portal import Portal | ||||
|  | @ -1021,6 +1025,8 @@ class Context: | |||
|             assert self._scope | ||||
|             self._scope.cancel() | ||||
| 
 | ||||
|     # TODO? should we move this to `._streaming` much like we | ||||
|     # moved `Portal.open_context()`'s def to this mod? | ||||
|     @acm | ||||
|     async def open_stream( | ||||
|         self, | ||||
|  | @ -1848,6 +1854,541 @@ class Context: | |||
|             return False | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_context_from_portal( | ||||
|     portal: Portal, | ||||
|     func: Callable, | ||||
| 
 | ||||
|     allow_overruns: bool = False, | ||||
| 
 | ||||
|     # TODO: if we set this the wrapping `@acm` body will | ||||
|     # still be shown (awkwardly) on pdb REPL entry. Ideally | ||||
|     # we can similarly annotate that frame to NOT show? | ||||
|     hide_tb: bool = True, | ||||
| 
 | ||||
|     # proxied to RPC | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> AsyncGenerator[tuple[Context, Any], None]: | ||||
|     ''' | ||||
|     Open an inter-actor "task context"; a remote task is | ||||
|     scheduled and cancel-scope-state-linked to a `trio.run()` across | ||||
|     memory boundaries in another actor's runtime. | ||||
| 
 | ||||
|     This is an `@acm` API bound as `Portal.open_context()` which | ||||
|     allows for deterministic setup and teardown of a remotely | ||||
|     scheduled task in another remote actor. Once opened, the 2 now | ||||
|     "linked" tasks run completely in parallel in each actor's | ||||
|     runtime with their enclosing `trio.CancelScope`s kept in | ||||
|     a synced state wherein if either side errors or cancels an | ||||
|     equivalent error is relayed to the other side via an SC-compat | ||||
|     IPC protocol. | ||||
| 
 | ||||
|     The yielded `tuple` is a pair delivering a `tractor.Context` | ||||
|     and any first value "sent" by the "callee" task via a call | ||||
|     to `Context.started(<value: Any>)`; this side of the | ||||
|     context does not unblock until the "callee" task calls | ||||
|     `.started()` in similar style to `trio.Nursery.start()`. | ||||
|     When the "callee" (side that is "called"/started by a call | ||||
|     to *this* method) returns, the caller side (this) unblocks | ||||
|     and any final value delivered from the other end can be | ||||
|     retrieved using the `Contex.result()` api. | ||||
| 
 | ||||
|     The yielded ``Context`` instance further allows for opening | ||||
|     bidirectional streams, explicit cancellation and | ||||
|     structurred-concurrency-synchronized final result-msg | ||||
|     collection. See ``tractor.Context`` for more details. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     # conduct target func method structural checks | ||||
|     if not inspect.iscoroutinefunction(func) and ( | ||||
|         getattr(func, '_tractor_contex_function', False) | ||||
|     ): | ||||
|         raise TypeError( | ||||
|             f'{func} must be an async generator function!') | ||||
| 
 | ||||
|     # TODO: i think from here onward should probably | ||||
|     # just be factored into an `@acm` inside a new | ||||
|     # a new `_context.py` mod. | ||||
|     nsf = NamespacePath.from_ref(func) | ||||
| 
 | ||||
|     # XXX NOTE XXX: currenly we do NOT allow opening a contex | ||||
|     # with "self" since the local feeder mem-chan processing | ||||
|     # is not built for it. | ||||
|     if portal.channel.uid == portal.actor.uid: | ||||
|         raise RuntimeError( | ||||
|             '** !! Invalid Operation !! **\n' | ||||
|             'Can not open an IPC ctx with the local actor!\n' | ||||
|             f'|_{portal.actor}\n' | ||||
|         ) | ||||
| 
 | ||||
|     ctx: Context = await portal.actor.start_remote_task( | ||||
|         portal.channel, | ||||
|         nsf=nsf, | ||||
|         kwargs=kwargs, | ||||
| 
 | ||||
|         # NOTE: it's imporant to expose this since you might | ||||
|         # get the case where the parent who opened the context does | ||||
|         # not open a stream until after some slow startup/init | ||||
|         # period, in which case when the first msg is read from | ||||
|         # the feeder mem chan, say when first calling | ||||
|         # `Context.open_stream(allow_overruns=True)`, the overrun condition will be | ||||
|         # raised before any ignoring of overflow msgs can take | ||||
|         # place.. | ||||
|         allow_overruns=allow_overruns, | ||||
|     ) | ||||
| 
 | ||||
|     assert ctx._remote_func_type == 'context' | ||||
|     msg: dict = await ctx._recv_chan.receive() | ||||
| 
 | ||||
|     try: | ||||
|         # the "first" value here is delivered by the callee's | ||||
|         # ``Context.started()`` call. | ||||
|         first: Any = msg['started'] | ||||
|         ctx._started_called: bool = True | ||||
| 
 | ||||
|     except KeyError as src_error: | ||||
|         _raise_from_no_key_in_msg( | ||||
|             ctx=ctx, | ||||
|             msg=msg, | ||||
|             src_err=src_error, | ||||
|             log=log, | ||||
|             expect_key='started', | ||||
|         ) | ||||
| 
 | ||||
|     ctx._portal: Portal = portal | ||||
|     uid: tuple = portal.channel.uid | ||||
|     cid: str = ctx.cid | ||||
| 
 | ||||
|     # placeholder for any exception raised in the runtime | ||||
|     # or by user tasks which cause this context's closure. | ||||
|     scope_err: BaseException|None = None | ||||
|     ctxc_from_callee: ContextCancelled|None = None | ||||
|     try: | ||||
|         async with trio.open_nursery() as nurse: | ||||
| 
 | ||||
|             # NOTE: used to start overrun queuing tasks | ||||
|             ctx._scope_nursery: trio.Nursery = nurse | ||||
|             ctx._scope: trio.CancelScope = nurse.cancel_scope | ||||
| 
 | ||||
|             # deliver context instance and .started() msg value | ||||
|             # in enter tuple. | ||||
|             yield ctx, first | ||||
| 
 | ||||
|             # ??TODO??: do we still want to consider this or is | ||||
|             # the `else:` block handling via a `.result()` | ||||
|             # call below enough?? | ||||
|             # -[ ] pretty sure `.result()` internals do the | ||||
|             # same as our ctxc handler below so it ended up | ||||
|             # being same (repeated?) behaviour, but ideally we | ||||
|             # wouldn't have that duplication either by somehow | ||||
|             # factoring the `.result()` handler impl in a way | ||||
|             # that we can re-use it around the `yield` ^ here | ||||
|             # or vice versa? | ||||
|             # | ||||
|             # NOTE: between the caller exiting and arriving | ||||
|             # here the far end may have sent a ctxc-msg or | ||||
|             # other error, so check for it here immediately | ||||
|             # and maybe raise so as to engage the ctxc | ||||
|             # handling block below! | ||||
|             # | ||||
|             # if re := ctx._remote_error: | ||||
|             #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( | ||||
|             #         re, | ||||
|             #         # TODO: do we want this to always raise? | ||||
|             #         # - means that on self-ctxc, if/when the | ||||
|             #         #   block is exited before the msg arrives | ||||
|             #         #   but then the msg during __exit__ | ||||
|             #         #   calling we may not activate the | ||||
|             #         #   ctxc-handler block below? should we | ||||
|             #         #   be? | ||||
|             #         # - if there's a remote error that arrives | ||||
|             #         #   after the child has exited, we won't | ||||
|             #         #   handle until the `finally:` block | ||||
|             #         #   where `.result()` is always called, | ||||
|             #         #   again in which case we handle it | ||||
|             #         #   differently then in the handler block | ||||
|             #         #   that would normally engage from THIS | ||||
|             #         #   block? | ||||
|             #         raise_ctxc_from_self_call=True, | ||||
|             #     ) | ||||
|             #     ctxc_from_callee = maybe_ctxc | ||||
| 
 | ||||
|             # when in allow_overruns mode there may be | ||||
|             # lingering overflow sender tasks remaining? | ||||
|             if nurse.child_tasks: | ||||
|                 # XXX: ensure we are in overrun state | ||||
|                 # with ``._allow_overruns=True`` bc otherwise | ||||
|                 # there should be no tasks in this nursery! | ||||
|                 if ( | ||||
|                     not ctx._allow_overruns | ||||
|                     or len(nurse.child_tasks) > 1 | ||||
|                 ): | ||||
|                     raise InternalError( | ||||
|                         'Context has sub-tasks but is ' | ||||
|                         'not in `allow_overruns=True` mode!?' | ||||
|                     ) | ||||
| 
 | ||||
|                 # ensure we cancel all overflow sender | ||||
|                 # tasks started in the nursery when | ||||
|                 # `._allow_overruns == True`. | ||||
|                 # | ||||
|                 # NOTE: this means `._scope.cancelled_caught` | ||||
|                 # will prolly be set! not sure if that's | ||||
|                 # non-ideal or not ??? | ||||
|                 ctx._scope.cancel() | ||||
| 
 | ||||
|     # XXX NOTE XXX: maybe shield against | ||||
|     # self-context-cancellation (which raises a local | ||||
|     # `ContextCancelled`) when requested (via | ||||
|     # `Context.cancel()`) by the same task (tree) which entered | ||||
|     # THIS `.open_context()`. | ||||
|     # | ||||
|     # NOTE: There are 2 operating cases for a "graceful cancel" | ||||
|     # of a `Context`. In both cases any `ContextCancelled` | ||||
|     # raised in this scope-block came from a transport msg | ||||
|     # relayed from some remote-actor-task which our runtime set | ||||
|     # as to `Context._remote_error` | ||||
|     # | ||||
|     # the CASES: | ||||
|     # | ||||
|     # - if that context IS THE SAME ONE that called | ||||
|     #   `Context.cancel()`, we want to absorb the error | ||||
|     #   silently and let this `.open_context()` block to exit | ||||
|     #   without raising, ideally eventually receiving the ctxc | ||||
|     #   ack msg thus resulting in `ctx.cancel_acked == True`. | ||||
|     # | ||||
|     # - if it is from some OTHER context (we did NOT call | ||||
|     #   `.cancel()`), we want to re-RAISE IT whilst also | ||||
|     #   setting our own ctx's "reason for cancel" to be that | ||||
|     #   other context's cancellation condition; we set our | ||||
|     #   `.canceller: tuple[str, str]` to be same value as | ||||
|     #   caught here in a `ContextCancelled.canceller`. | ||||
|     # | ||||
|     # AGAIN to restate the above, there are 2 cases: | ||||
|     # | ||||
|     # 1-some other context opened in this `.open_context()` | ||||
|     #   block cancelled due to a self or peer cancellation | ||||
|     #   request in which case we DO let the error bubble to the | ||||
|     #   opener. | ||||
|     # | ||||
|     # 2-THIS "caller" task somewhere invoked `Context.cancel()` | ||||
|     #   and received a `ContextCanclled` from the "callee" | ||||
|     #   task, in which case we mask the `ContextCancelled` from | ||||
|     #   bubbling to this "caller" (much like how `trio.Nursery` | ||||
|     #   swallows any `trio.Cancelled` bubbled by a call to | ||||
|     #   `Nursery.cancel_scope.cancel()`) | ||||
|     except ContextCancelled as ctxc: | ||||
|         scope_err = ctxc | ||||
|         ctx._local_error: BaseException = scope_err | ||||
|         ctxc_from_callee = ctxc | ||||
| 
 | ||||
|         # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! | ||||
|         # using this code and then resuming the REPL will | ||||
|         # cause a SIGINT-ignoring HANG! | ||||
|         # -> prolly due to a stale debug lock entry.. | ||||
|         # -[ ] USE `.stackscope` to demonstrate that (possibly | ||||
|         #   documenting it as a definittive example of | ||||
|         #   debugging the tractor-runtime itself using it's | ||||
|         #   own `.devx.` tooling! | ||||
|         #  | ||||
|         # await _debug.pause() | ||||
| 
 | ||||
|         # CASE 2: context was cancelled by local task calling | ||||
|         # `.cancel()`, we don't raise and the exit block should | ||||
|         # exit silently. | ||||
|         if ( | ||||
|             ctx._cancel_called | ||||
|             and | ||||
|             ctxc is ctx._remote_error | ||||
|             and | ||||
|             ctxc.canceller == portal.actor.uid | ||||
|         ): | ||||
|             log.cancel( | ||||
|                 f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' | ||||
|                 f'{ctxc}' | ||||
|             ) | ||||
|         # CASE 1: this context was never cancelled via a local | ||||
|         # task (tree) having called `Context.cancel()`, raise | ||||
|         # the error since it was caused by someone else | ||||
|         # -> probably a remote peer! | ||||
|         else: | ||||
|             raise | ||||
| 
 | ||||
|     # the above `._scope` can be cancelled due to: | ||||
|     # 1. an explicit self cancel via `Context.cancel()` or | ||||
|     #    `Actor.cancel()`, | ||||
|     # 2. any "callee"-side remote error, possibly also a cancellation | ||||
|     #    request by some peer, | ||||
|     # 3. any "caller" (aka THIS scope's) local error raised in the above `yield` | ||||
|     except ( | ||||
|         # CASE 3: standard local error in this caller/yieldee | ||||
|         Exception, | ||||
| 
 | ||||
|         # CASES 1 & 2: can manifest as a `ctx._scope_nursery` | ||||
|         # exception-group of, | ||||
|         # | ||||
|         # 1.-`trio.Cancelled`s, since | ||||
|         #   `._scope.cancel()` will have been called | ||||
|         #   (transitively by the runtime calling | ||||
|         #   `._deliver_msg()`) and any `ContextCancelled` | ||||
|         #   eventually absorbed and thus absorbed/supressed in | ||||
|         #   any `Context._maybe_raise_remote_err()` call. | ||||
|         # | ||||
|         # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` | ||||
|         #    from any error delivered from the "callee" side | ||||
|         #    AND a group-exc is only raised if there was > 1 | ||||
|         #    tasks started *here* in the "caller" / opener | ||||
|         #    block. If any one of those tasks calls | ||||
|         #    `.result()` or `MsgStream.receive()` | ||||
|         #    `._maybe_raise_remote_err()` will be transitively | ||||
|         #    called and the remote error raised causing all | ||||
|         #    tasks to be cancelled. | ||||
|         #    NOTE: ^ this case always can happen if any | ||||
|         #    overrun handler tasks were spawned! | ||||
|         BaseExceptionGroup, | ||||
| 
 | ||||
|         trio.Cancelled,  # NOTE: NOT from inside the ctx._scope | ||||
|         KeyboardInterrupt, | ||||
| 
 | ||||
|     ) as caller_err: | ||||
|         scope_err = caller_err | ||||
|         ctx._local_error: BaseException = scope_err | ||||
| 
 | ||||
|         # XXX: ALWAYS request the context to CANCEL ON any ERROR. | ||||
|         # NOTE: `Context.cancel()` is conversely NEVER CALLED in | ||||
|         # the `ContextCancelled` "self cancellation absorbed" case | ||||
|         # handled in the block above ^^^ !! | ||||
|         # await _debug.pause() | ||||
|         log.cancel( | ||||
|             'Context terminated due to\n\n' | ||||
|             f'.outcome => {ctx.repr_outcome()}\n' | ||||
|         ) | ||||
| 
 | ||||
|         if debug_mode(): | ||||
|             # async with _debug.acquire_debug_lock(portal.actor.uid): | ||||
|             #     pass | ||||
|             # TODO: factor ^ into below for non-root cases? | ||||
|             # | ||||
|             from .devx import maybe_wait_for_debugger | ||||
|             was_acquired: bool = await maybe_wait_for_debugger( | ||||
|                 header_msg=( | ||||
|                     'Delaying `ctx.cancel()` until debug lock ' | ||||
|                     'acquired..\n' | ||||
|                 ), | ||||
|             ) | ||||
|             if was_acquired: | ||||
|                 log.pdb( | ||||
|                     'Acquired debug lock! ' | ||||
|                     'Calling `ctx.cancel()`!\n' | ||||
|                 ) | ||||
| 
 | ||||
|         # we don't need to cancel the callee if it already | ||||
|         # told us it's cancelled ;p | ||||
|         if ctxc_from_callee is None: | ||||
|             try: | ||||
|                 await ctx.cancel() | ||||
|             except ( | ||||
|                 trio.BrokenResourceError, | ||||
|                 trio.ClosedResourceError, | ||||
|             ): | ||||
|                 log.warning( | ||||
|                     'IPC connection for context is broken?\n' | ||||
|                     f'task:{cid}\n' | ||||
|                     f'actor:{uid}' | ||||
|                 ) | ||||
| 
 | ||||
|         raise  # duh | ||||
| 
 | ||||
|     # no local scope error, the "clean exit with a result" case. | ||||
|     else: | ||||
|         if ctx.chan.connected(): | ||||
|             log.runtime( | ||||
|                 'Waiting on final context result for\n' | ||||
|                 f'peer: {uid}\n' | ||||
|                 f'|_{ctx._task}\n' | ||||
|             ) | ||||
|             # XXX NOTE XXX: the below call to | ||||
|             # `Context.result()` will ALWAYS raise | ||||
|             # a `ContextCancelled` (via an embedded call to | ||||
|             # `Context._maybe_raise_remote_err()`) IFF | ||||
|             # a `Context._remote_error` was set by the runtime | ||||
|             # via a call to | ||||
|             # `Context._maybe_cancel_and_set_remote_error()`. | ||||
|             # As per `Context._deliver_msg()`, that error IS | ||||
|             # ALWAYS SET any time "callee" side fails and causes "caller | ||||
|             # side" cancellation via a `ContextCancelled` here. | ||||
|             try: | ||||
|                 result_or_err: Exception|Any = await ctx.result() | ||||
|             except BaseException as berr: | ||||
|                 # on normal teardown, if we get some error | ||||
|                 # raised in `Context.result()` we still want to | ||||
|                 # save that error on the ctx's state to | ||||
|                 # determine things like `.cancelled_caught` for | ||||
|                 # cases where there was remote cancellation but | ||||
|                 # this task didn't know until final teardown | ||||
|                 # / value collection. | ||||
|                 scope_err = berr | ||||
|                 ctx._local_error: BaseException = scope_err | ||||
|                 raise | ||||
| 
 | ||||
|             # yes! this worx Bp | ||||
|             # from .devx import _debug | ||||
|             # await _debug.pause() | ||||
| 
 | ||||
|             # an exception type boxed in a `RemoteActorError` | ||||
|             # is returned (meaning it was obvi not raised) | ||||
|             # that we want to log-report on. | ||||
|             msgdata: str|None = getattr( | ||||
|                 result_or_err, | ||||
|                 'msgdata', | ||||
|                 None | ||||
|             ) | ||||
|             match (msgdata, result_or_err): | ||||
|                 case ( | ||||
|                     {'tb_str': tbstr}, | ||||
|                     ContextCancelled(), | ||||
|                 ): | ||||
|                     log.cancel(tbstr) | ||||
| 
 | ||||
|                 case ( | ||||
|                     {'tb_str': tbstr}, | ||||
|                     RemoteActorError(), | ||||
|                 ): | ||||
|                     log.exception( | ||||
|                         'Context remotely errored!\n' | ||||
|                         f'<= peer: {uid}\n' | ||||
|                         f'  |_ {nsf}()\n\n' | ||||
| 
 | ||||
|                         f'{tbstr}' | ||||
|                     ) | ||||
|                 case (None, _): | ||||
|                     log.runtime( | ||||
|                         'Context returned final result from callee task:\n' | ||||
|                         f'<= peer: {uid}\n' | ||||
|                         f'  |_ {nsf}()\n\n' | ||||
| 
 | ||||
|                         f'`{result_or_err}`\n' | ||||
|                     ) | ||||
| 
 | ||||
|     finally: | ||||
|         # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|         # wait for any immediate child in debug before popping the | ||||
|         # context from the runtime msg loop otherwise inside | ||||
|         # ``Actor._push_result()`` the msg will be discarded and in | ||||
|         # the case where that msg is global debugger unlock (via | ||||
|         # a "stop" msg for a stream), this can result in a deadlock | ||||
|         # where the root is waiting on the lock to clear but the | ||||
|         # child has already cleared it and clobbered IPC. | ||||
|         if debug_mode(): | ||||
|             from .devx import maybe_wait_for_debugger | ||||
|             await maybe_wait_for_debugger() | ||||
| 
 | ||||
|         # though it should be impossible for any tasks | ||||
|         # operating *in* this scope to have survived | ||||
|         # we tear down the runtime feeder chan last | ||||
|         # to avoid premature stream clobbers. | ||||
|         if ( | ||||
|             (rxchan := ctx._recv_chan) | ||||
| 
 | ||||
|             # maybe TODO: yes i know the below check is | ||||
|             # touching `trio` memchan internals..BUT, there are | ||||
|             # only a couple ways to avoid a `trio.Cancelled` | ||||
|             # bubbling from the `.aclose()` call below: | ||||
|             # | ||||
|             # - catch and mask it via the cancel-scope-shielded call | ||||
|             #   as we are rn (manual and frowned upon) OR, | ||||
|             # - specially handle the case where `scope_err` is | ||||
|             #   one of {`BaseExceptionGroup`, `trio.Cancelled`} | ||||
|             #   and then presume that the `.aclose()` call will | ||||
|             #   raise a `trio.Cancelled` and just don't call it | ||||
|             #   in those cases.. | ||||
|             # | ||||
|             # that latter approach is more logic, LOC, and more | ||||
|             # convoluted so for now stick with the first | ||||
|             # psuedo-hack-workaround where we just try to avoid | ||||
|             # the shielded call as much as we can detect from | ||||
|             # the memchan's `._closed` state.. | ||||
|             # | ||||
|             # XXX MOTIVATION XXX-> we generally want to raise | ||||
|             # any underlying actor-runtime/internals error that | ||||
|             # surfaces from a bug in tractor itself so it can | ||||
|             # be easily detected/fixed AND, we also want to | ||||
|             # minimize noisy runtime tracebacks (normally due | ||||
|             # to the cross-actor linked task scope machinery | ||||
|             # teardown) displayed to user-code and instead only | ||||
|             # displaying `ContextCancelled` traces where the | ||||
|             # cause of crash/exit IS due to something in | ||||
|             # user/app code on either end of the context. | ||||
|             and not rxchan._closed | ||||
|         ): | ||||
|             # XXX NOTE XXX: and again as per above, we mask any | ||||
|             # `trio.Cancelled` raised here so as to NOT mask | ||||
|             # out any exception group or legit (remote) ctx | ||||
|             # error that sourced from the remote task or its | ||||
|             # runtime. | ||||
|             # | ||||
|             # NOTE: further, this should be the only place the | ||||
|             # underlying feeder channel is | ||||
|             # once-and-only-CLOSED! | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await ctx._recv_chan.aclose() | ||||
| 
 | ||||
|         # XXX: we always raise remote errors locally and | ||||
|         # generally speaking mask runtime-machinery related | ||||
|         # multi-`trio.Cancelled`s. As such, any `scope_error` | ||||
|         # which was the underlying cause of this context's exit | ||||
|         # should be stored as the `Context._local_error` and | ||||
|         # used in determining `Context.cancelled_caught: bool`. | ||||
|         if scope_err is not None: | ||||
|             # sanity, tho can remove? | ||||
|             assert ctx._local_error is scope_err | ||||
|             # ctx._local_error: BaseException = scope_err | ||||
|             # etype: Type[BaseException] = type(scope_err) | ||||
| 
 | ||||
|             # CASE 2 | ||||
|             if ( | ||||
|                 ctx._cancel_called | ||||
|                 and ctx.cancel_acked | ||||
|             ): | ||||
|                 log.cancel( | ||||
|                     'Context cancelled by caller task\n' | ||||
|                     f'|_{ctx._task}\n\n' | ||||
| 
 | ||||
|                     f'{repr(scope_err)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # TODO: should we add a `._cancel_req_received` | ||||
|             # flag to determine if the callee manually called | ||||
|             # `ctx.cancel()`? | ||||
|             # -[ ] going to need a cid check no? | ||||
| 
 | ||||
|             # CASE 1 | ||||
|             else: | ||||
|                 outcome_str: str = ctx.repr_outcome( | ||||
|                     show_error_fields=True, | ||||
|                     # type_only=True, | ||||
|                 ) | ||||
|                 log.cancel( | ||||
|                     f'Context terminated due to local scope error:\n\n' | ||||
|                     f'{ctx.chan.uid} => {outcome_str}\n' | ||||
|                 ) | ||||
| 
 | ||||
|         # FINALLY, remove the context from runtime tracking and | ||||
|         # exit! | ||||
|         log.runtime( | ||||
|             'Removing IPC ctx opened with peer\n' | ||||
|             f'{uid}\n' | ||||
|             f'|_{ctx}\n' | ||||
|         ) | ||||
|         portal.actor._contexts.pop( | ||||
|             (uid, cid), | ||||
|             None, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def mk_context( | ||||
|     chan: Channel, | ||||
|     cid: str, | ||||
|  |  | |||
|  | @ -24,6 +24,7 @@ OS processes, possibly on different (hardware) hosts. | |||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import asynccontextmanager as acm | ||||
| import importlib | ||||
| import inspect | ||||
| from typing import ( | ||||
|  | @ -37,30 +38,21 @@ from dataclasses import dataclass | |||
| import warnings | ||||
| 
 | ||||
| import trio | ||||
| from async_generator import asynccontextmanager | ||||
| 
 | ||||
| from .trionics import maybe_open_nursery | ||||
| from .devx import ( | ||||
|     # _debug, | ||||
|     maybe_wait_for_debugger, | ||||
| ) | ||||
| from ._state import ( | ||||
|     current_actor, | ||||
|     debug_mode, | ||||
| ) | ||||
| from ._ipc import Channel | ||||
| from .log import get_logger | ||||
| from .msg import NamespacePath | ||||
| from ._exceptions import ( | ||||
|     InternalError, | ||||
|     _raise_from_no_key_in_msg, | ||||
|     unpack_error, | ||||
|     NoResult, | ||||
|     ContextCancelled, | ||||
|     RemoteActorError, | ||||
| ) | ||||
| from ._context import ( | ||||
|     Context, | ||||
|     open_context_from_portal, | ||||
| ) | ||||
| from ._streaming import ( | ||||
|     MsgStream, | ||||
|  | @ -392,7 +384,7 @@ class Portal: | |||
|             self.channel, | ||||
|         ) | ||||
| 
 | ||||
|     @asynccontextmanager | ||||
|     @acm | ||||
|     async def open_stream_from( | ||||
|         self, | ||||
|         async_gen_func: Callable,  # typing: ignore | ||||
|  | @ -449,550 +441,12 @@ class Portal: | |||
|             # await recv_chan.aclose() | ||||
|             self._streams.remove(rchan) | ||||
| 
 | ||||
|     # TODO: move this impl to `._context` mod and | ||||
|     # instead just bind it here as a method so that the logic | ||||
|     # for ctx stuff stays all in one place (instead of frickin | ||||
|     # having to open this file in tandem every gd time!!! XD) | ||||
|     # | ||||
|     @asynccontextmanager | ||||
|     async def open_context( | ||||
| 
 | ||||
|         self, | ||||
|         func: Callable, | ||||
| 
 | ||||
|         allow_overruns: bool = False, | ||||
| 
 | ||||
|         # TODO: if we set this the wrapping `@acm` body will | ||||
|         # still be shown (awkwardly) on pdb REPL entry. Ideally | ||||
|         # we can similarly annotate that frame to NOT show? | ||||
|         hide_tb: bool = True, | ||||
| 
 | ||||
|         # proxied to RPC | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[tuple[Context, Any], None]: | ||||
|         ''' | ||||
|         Open an inter-actor "task context"; a remote task is | ||||
|         scheduled and cancel-scope-state-linked to a `trio.run()` across | ||||
|         memory boundaries in another actor's runtime. | ||||
| 
 | ||||
|         This is an `@acm` API which allows for deterministic setup | ||||
|         and teardown of a remotely scheduled task in another remote | ||||
|         actor. Once opened, the 2 now "linked" tasks run completely | ||||
|         in parallel in each actor's runtime with their enclosing | ||||
|         `trio.CancelScope`s kept in a synced state wherein if | ||||
|         either side errors or cancels an equivalent error is | ||||
|         relayed to the other side via an SC-compat IPC protocol. | ||||
| 
 | ||||
|         The yielded `tuple` is a pair delivering a `tractor.Context` | ||||
|         and any first value "sent" by the "callee" task via a call | ||||
|         to `Context.started(<value: Any>)`; this side of the | ||||
|         context does not unblock until the "callee" task calls | ||||
|         `.started()` in similar style to `trio.Nursery.start()`. | ||||
|         When the "callee" (side that is "called"/started by a call | ||||
|         to *this* method) returns, the caller side (this) unblocks | ||||
|         and any final value delivered from the other end can be | ||||
|         retrieved using the `Contex.result()` api. | ||||
| 
 | ||||
|         The yielded ``Context`` instance further allows for opening | ||||
|         bidirectional streams, explicit cancellation and | ||||
|         structurred-concurrency-synchronized final result-msg | ||||
|         collection. See ``tractor.Context`` for more details. | ||||
| 
 | ||||
|         ''' | ||||
|         __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|         # conduct target func method structural checks | ||||
|         if not inspect.iscoroutinefunction(func) and ( | ||||
|             getattr(func, '_tractor_contex_function', False) | ||||
|         ): | ||||
|             raise TypeError( | ||||
|                 f'{func} must be an async generator function!') | ||||
| 
 | ||||
|         # TODO: i think from here onward should probably | ||||
|         # just be factored into an `@acm` inside a new | ||||
|         # a new `_context.py` mod. | ||||
|         nsf = NamespacePath.from_ref(func) | ||||
| 
 | ||||
|         # XXX NOTE XXX: currenly we do NOT allow opening a contex | ||||
|         # with "self" since the local feeder mem-chan processing | ||||
|         # is not built for it. | ||||
|         if self.channel.uid == self.actor.uid: | ||||
|             raise RuntimeError( | ||||
|                 '** !! Invalid Operation !! **\n' | ||||
|                 'Can not open an IPC ctx with the local actor!\n' | ||||
|                 f'|_{self.actor}\n' | ||||
|             ) | ||||
| 
 | ||||
|         ctx: Context = await self.actor.start_remote_task( | ||||
|             self.channel, | ||||
|             nsf=nsf, | ||||
|             kwargs=kwargs, | ||||
| 
 | ||||
|             # NOTE: it's imporant to expose this since you might | ||||
|             # get the case where the parent who opened the context does | ||||
|             # not open a stream until after some slow startup/init | ||||
|             # period, in which case when the first msg is read from | ||||
|             # the feeder mem chan, say when first calling | ||||
|             # `Context.open_stream(allow_overruns=True)`, the overrun condition will be | ||||
|             # raised before any ignoring of overflow msgs can take | ||||
|             # place.. | ||||
|             allow_overruns=allow_overruns, | ||||
|         ) | ||||
| 
 | ||||
|         assert ctx._remote_func_type == 'context' | ||||
|         msg: dict = await ctx._recv_chan.receive() | ||||
| 
 | ||||
|         try: | ||||
|             # the "first" value here is delivered by the callee's | ||||
|             # ``Context.started()`` call. | ||||
|             first: Any = msg['started'] | ||||
|             ctx._started_called: bool = True | ||||
| 
 | ||||
|         except KeyError as src_error: | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=ctx, | ||||
|                 msg=msg, | ||||
|                 src_err=src_error, | ||||
|                 log=log, | ||||
|                 expect_key='started', | ||||
|             ) | ||||
| 
 | ||||
|         ctx._portal: Portal = self | ||||
|         uid: tuple = self.channel.uid | ||||
|         cid: str = ctx.cid | ||||
| 
 | ||||
|         # placeholder for any exception raised in the runtime | ||||
|         # or by user tasks which cause this context's closure. | ||||
|         scope_err: BaseException|None = None | ||||
|         ctxc_from_callee: ContextCancelled|None = None | ||||
|         try: | ||||
|             async with trio.open_nursery() as nurse: | ||||
| 
 | ||||
|                 # NOTE: used to start overrun queuing tasks | ||||
|                 ctx._scope_nursery: trio.Nursery = nurse | ||||
|                 ctx._scope: trio.CancelScope = nurse.cancel_scope | ||||
| 
 | ||||
|                 # deliver context instance and .started() msg value | ||||
|                 # in enter tuple. | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|                 # ??TODO??: do we still want to consider this or is | ||||
|                 # the `else:` block handling via a `.result()` | ||||
|                 # call below enough?? | ||||
|                 # -[ ] pretty sure `.result()` internals do the | ||||
|                 # same as our ctxc handler below so it ended up | ||||
|                 # being same (repeated?) behaviour, but ideally we | ||||
|                 # wouldn't have that duplication either by somehow | ||||
|                 # factoring the `.result()` handler impl in a way | ||||
|                 # that we can re-use it around the `yield` ^ here | ||||
|                 # or vice versa? | ||||
|                 # | ||||
|                 # NOTE: between the caller exiting and arriving | ||||
|                 # here the far end may have sent a ctxc-msg or | ||||
|                 # other error, so check for it here immediately | ||||
|                 # and maybe raise so as to engage the ctxc | ||||
|                 # handling block below! | ||||
|                 # | ||||
|                 # if re := ctx._remote_error: | ||||
|                 #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( | ||||
|                 #         re, | ||||
|                 #         # TODO: do we want this to always raise? | ||||
|                 #         # - means that on self-ctxc, if/when the | ||||
|                 #         #   block is exited before the msg arrives | ||||
|                 #         #   but then the msg during __exit__ | ||||
|                 #         #   calling we may not activate the | ||||
|                 #         #   ctxc-handler block below? should we | ||||
|                 #         #   be? | ||||
|                 #         # - if there's a remote error that arrives | ||||
|                 #         #   after the child has exited, we won't | ||||
|                 #         #   handle until the `finally:` block | ||||
|                 #         #   where `.result()` is always called, | ||||
|                 #         #   again in which case we handle it | ||||
|                 #         #   differently then in the handler block | ||||
|                 #         #   that would normally engage from THIS | ||||
|                 #         #   block? | ||||
|                 #         raise_ctxc_from_self_call=True, | ||||
|                 #     ) | ||||
|                 #     ctxc_from_callee = maybe_ctxc | ||||
| 
 | ||||
|                 # when in allow_overruns mode there may be | ||||
|                 # lingering overflow sender tasks remaining? | ||||
|                 if nurse.child_tasks: | ||||
|                     # XXX: ensure we are in overrun state | ||||
|                     # with ``._allow_overruns=True`` bc otherwise | ||||
|                     # there should be no tasks in this nursery! | ||||
|                     if ( | ||||
|                         not ctx._allow_overruns | ||||
|                         or len(nurse.child_tasks) > 1 | ||||
|                     ): | ||||
|                         raise InternalError( | ||||
|                             'Context has sub-tasks but is ' | ||||
|                             'not in `allow_overruns=True` mode!?' | ||||
|                         ) | ||||
| 
 | ||||
|                     # ensure we cancel all overflow sender | ||||
|                     # tasks started in the nursery when | ||||
|                     # `._allow_overruns == True`. | ||||
|                     # | ||||
|                     # NOTE: this means `._scope.cancelled_caught` | ||||
|                     # will prolly be set! not sure if that's | ||||
|                     # non-ideal or not ??? | ||||
|                     ctx._scope.cancel() | ||||
| 
 | ||||
|         # XXX NOTE XXX: maybe shield against | ||||
|         # self-context-cancellation (which raises a local | ||||
|         # `ContextCancelled`) when requested (via | ||||
|         # `Context.cancel()`) by the same task (tree) which entered | ||||
|         # THIS `.open_context()`. | ||||
|         # | ||||
|         # NOTE: There are 2 operating cases for a "graceful cancel" | ||||
|         # of a `Context`. In both cases any `ContextCancelled` | ||||
|         # raised in this scope-block came from a transport msg | ||||
|         # relayed from some remote-actor-task which our runtime set | ||||
|         # as to `Context._remote_error` | ||||
|         # | ||||
|         # the CASES: | ||||
|         # | ||||
|         # - if that context IS THE SAME ONE that called | ||||
|         #   `Context.cancel()`, we want to absorb the error | ||||
|         #   silently and let this `.open_context()` block to exit | ||||
|         #   without raising, ideally eventually receiving the ctxc | ||||
|         #   ack msg thus resulting in `ctx.cancel_acked == True`. | ||||
|         # | ||||
|         # - if it is from some OTHER context (we did NOT call | ||||
|         #   `.cancel()`), we want to re-RAISE IT whilst also | ||||
|         #   setting our own ctx's "reason for cancel" to be that | ||||
|         #   other context's cancellation condition; we set our | ||||
|         #   `.canceller: tuple[str, str]` to be same value as | ||||
|         #   caught here in a `ContextCancelled.canceller`. | ||||
|         # | ||||
|         # AGAIN to restate the above, there are 2 cases: | ||||
|         # | ||||
|         # 1-some other context opened in this `.open_context()` | ||||
|         #   block cancelled due to a self or peer cancellation | ||||
|         #   request in which case we DO let the error bubble to the | ||||
|         #   opener. | ||||
|         # | ||||
|         # 2-THIS "caller" task somewhere invoked `Context.cancel()` | ||||
|         #   and received a `ContextCanclled` from the "callee" | ||||
|         #   task, in which case we mask the `ContextCancelled` from | ||||
|         #   bubbling to this "caller" (much like how `trio.Nursery` | ||||
|         #   swallows any `trio.Cancelled` bubbled by a call to | ||||
|         #   `Nursery.cancel_scope.cancel()`) | ||||
|         except ContextCancelled as ctxc: | ||||
|             scope_err = ctxc | ||||
|             ctx._local_error: BaseException = scope_err | ||||
|             ctxc_from_callee = ctxc | ||||
| 
 | ||||
|             # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! | ||||
|             # using this code and then resuming the REPL will | ||||
|             # cause a SIGINT-ignoring HANG! | ||||
|             # -> prolly due to a stale debug lock entry.. | ||||
|             # -[ ] USE `.stackscope` to demonstrate that (possibly | ||||
|             #   documenting it as a definittive example of | ||||
|             #   debugging the tractor-runtime itself using it's | ||||
|             #   own `.devx.` tooling! | ||||
|             #  | ||||
|             # await _debug.pause() | ||||
| 
 | ||||
|             # CASE 2: context was cancelled by local task calling | ||||
|             # `.cancel()`, we don't raise and the exit block should | ||||
|             # exit silently. | ||||
|             if ( | ||||
|                 ctx._cancel_called | ||||
|                 and | ||||
|                 ctxc is ctx._remote_error | ||||
|                 and | ||||
|                 ctxc.canceller == self.actor.uid | ||||
|             ): | ||||
|                 log.cancel( | ||||
|                     f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' | ||||
|                     f'{ctxc}' | ||||
|                 ) | ||||
|             # CASE 1: this context was never cancelled via a local | ||||
|             # task (tree) having called `Context.cancel()`, raise | ||||
|             # the error since it was caused by someone else | ||||
|             # -> probably a remote peer! | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         # the above `._scope` can be cancelled due to: | ||||
|         # 1. an explicit self cancel via `Context.cancel()` or | ||||
|         #    `Actor.cancel()`, | ||||
|         # 2. any "callee"-side remote error, possibly also a cancellation | ||||
|         #    request by some peer, | ||||
|         # 3. any "caller" (aka THIS scope's) local error raised in the above `yield` | ||||
|         except ( | ||||
|             # CASE 3: standard local error in this caller/yieldee | ||||
|             Exception, | ||||
| 
 | ||||
|             # CASES 1 & 2: can manifest as a `ctx._scope_nursery` | ||||
|             # exception-group of, | ||||
|             # | ||||
|             # 1.-`trio.Cancelled`s, since | ||||
|             #   `._scope.cancel()` will have been called | ||||
|             #   (transitively by the runtime calling | ||||
|             #   `._deliver_msg()`) and any `ContextCancelled` | ||||
|             #   eventually absorbed and thus absorbed/supressed in | ||||
|             #   any `Context._maybe_raise_remote_err()` call. | ||||
|             # | ||||
|             # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` | ||||
|             #    from any error delivered from the "callee" side | ||||
|             #    AND a group-exc is only raised if there was > 1 | ||||
|             #    tasks started *here* in the "caller" / opener | ||||
|             #    block. If any one of those tasks calls | ||||
|             #    `.result()` or `MsgStream.receive()` | ||||
|             #    `._maybe_raise_remote_err()` will be transitively | ||||
|             #    called and the remote error raised causing all | ||||
|             #    tasks to be cancelled. | ||||
|             #    NOTE: ^ this case always can happen if any | ||||
|             #    overrun handler tasks were spawned! | ||||
|             BaseExceptionGroup, | ||||
| 
 | ||||
|             trio.Cancelled,  # NOTE: NOT from inside the ctx._scope | ||||
|             KeyboardInterrupt, | ||||
| 
 | ||||
|         ) as caller_err: | ||||
|             scope_err = caller_err | ||||
|             ctx._local_error: BaseException = scope_err | ||||
| 
 | ||||
|             # XXX: ALWAYS request the context to CANCEL ON any ERROR. | ||||
|             # NOTE: `Context.cancel()` is conversely NEVER CALLED in | ||||
|             # the `ContextCancelled` "self cancellation absorbed" case | ||||
|             # handled in the block above ^^^ !! | ||||
|             # await _debug.pause() | ||||
|             log.cancel( | ||||
|                 'Context terminated due to\n\n' | ||||
|                 f'.outcome => {ctx.repr_outcome()}\n' | ||||
|             ) | ||||
| 
 | ||||
|             if debug_mode(): | ||||
|                 # async with _debug.acquire_debug_lock(self.actor.uid): | ||||
|                 #     pass | ||||
|                 # TODO: factor ^ into below for non-root cases? | ||||
|                 was_acquired: bool = await maybe_wait_for_debugger( | ||||
|                     header_msg=( | ||||
|                         'Delaying `ctx.cancel()` until debug lock ' | ||||
|                         'acquired..\n' | ||||
|                     ), | ||||
|                 ) | ||||
|                 if was_acquired: | ||||
|                     log.pdb( | ||||
|                         'Acquired debug lock! ' | ||||
|                         'Calling `ctx.cancel()`!\n' | ||||
|                     ) | ||||
| 
 | ||||
| 
 | ||||
|             # we don't need to cancel the callee if it already | ||||
|             # told us it's cancelled ;p | ||||
|             if ctxc_from_callee is None: | ||||
|                 try: | ||||
|                     await ctx.cancel() | ||||
|                 except ( | ||||
|                     trio.BrokenResourceError, | ||||
|                     trio.ClosedResourceError, | ||||
|                 ): | ||||
|                     log.warning( | ||||
|                         'IPC connection for context is broken?\n' | ||||
|                         f'task:{cid}\n' | ||||
|                         f'actor:{uid}' | ||||
|                     ) | ||||
| 
 | ||||
|             raise  # duh | ||||
| 
 | ||||
|         # no local scope error, the "clean exit with a result" case. | ||||
|         else: | ||||
|             if ctx.chan.connected(): | ||||
|                 log.runtime( | ||||
|                     'Waiting on final context result for\n' | ||||
|                     f'peer: {uid}\n' | ||||
|                     f'|_{ctx._task}\n' | ||||
|                 ) | ||||
|                 # XXX NOTE XXX: the below call to | ||||
|                 # `Context.result()` will ALWAYS raise | ||||
|                 # a `ContextCancelled` (via an embedded call to | ||||
|                 # `Context._maybe_raise_remote_err()`) IFF | ||||
|                 # a `Context._remote_error` was set by the runtime | ||||
|                 # via a call to | ||||
|                 # `Context._maybe_cancel_and_set_remote_error()`. | ||||
|                 # As per `Context._deliver_msg()`, that error IS | ||||
|                 # ALWAYS SET any time "callee" side fails and causes "caller | ||||
|                 # side" cancellation via a `ContextCancelled` here. | ||||
|                 try: | ||||
|                     result_or_err: Exception|Any = await ctx.result() | ||||
|                 except BaseException as berr: | ||||
|                     # on normal teardown, if we get some error | ||||
|                     # raised in `Context.result()` we still want to | ||||
|                     # save that error on the ctx's state to | ||||
|                     # determine things like `.cancelled_caught` for | ||||
|                     # cases where there was remote cancellation but | ||||
|                     # this task didn't know until final teardown | ||||
|                     # / value collection. | ||||
|                     scope_err = berr | ||||
|                     ctx._local_error: BaseException = scope_err | ||||
|                     raise | ||||
| 
 | ||||
|                 # yes! this worx Bp | ||||
|                 # from .devx import _debug | ||||
|                 # await _debug.pause() | ||||
| 
 | ||||
|                 # an exception type boxed in a `RemoteActorError` | ||||
|                 # is returned (meaning it was obvi not raised) | ||||
|                 # that we want to log-report on. | ||||
|                 msgdata: str|None = getattr( | ||||
|                     result_or_err, | ||||
|                     'msgdata', | ||||
|                     None | ||||
|                 ) | ||||
|                 match (msgdata, result_or_err): | ||||
|                     case ( | ||||
|                         {'tb_str': tbstr}, | ||||
|                         ContextCancelled(), | ||||
|                     ): | ||||
|                         log.cancel(tbstr) | ||||
| 
 | ||||
|                     case ( | ||||
|                         {'tb_str': tbstr}, | ||||
|                         RemoteActorError(), | ||||
|                     ): | ||||
|                         log.exception( | ||||
|                             'Context remotely errored!\n' | ||||
|                             f'<= peer: {uid}\n' | ||||
|                             f'  |_ {nsf}()\n\n' | ||||
| 
 | ||||
|                             f'{tbstr}' | ||||
|                         ) | ||||
|                     case (None, _): | ||||
|                         log.runtime( | ||||
|                             'Context returned final result from callee task:\n' | ||||
|                             f'<= peer: {uid}\n' | ||||
|                             f'  |_ {nsf}()\n\n' | ||||
| 
 | ||||
|                             f'`{result_or_err}`\n' | ||||
|                         ) | ||||
| 
 | ||||
|         finally: | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|             # ``Actor._push_result()`` the msg will be discarded and in | ||||
|             # the case where that msg is global debugger unlock (via | ||||
|             # a "stop" msg for a stream), this can result in a deadlock | ||||
|             # where the root is waiting on the lock to clear but the | ||||
|             # child has already cleared it and clobbered IPC. | ||||
|             await maybe_wait_for_debugger() | ||||
| 
 | ||||
|             # though it should be impossible for any tasks | ||||
|             # operating *in* this scope to have survived | ||||
|             # we tear down the runtime feeder chan last | ||||
|             # to avoid premature stream clobbers. | ||||
|             if ( | ||||
|                 (rxchan := ctx._recv_chan) | ||||
| 
 | ||||
|                 # maybe TODO: yes i know the below check is | ||||
|                 # touching `trio` memchan internals..BUT, there are | ||||
|                 # only a couple ways to avoid a `trio.Cancelled` | ||||
|                 # bubbling from the `.aclose()` call below: | ||||
|                 # | ||||
|                 # - catch and mask it via the cancel-scope-shielded call | ||||
|                 #   as we are rn (manual and frowned upon) OR, | ||||
|                 # - specially handle the case where `scope_err` is | ||||
|                 #   one of {`BaseExceptionGroup`, `trio.Cancelled`} | ||||
|                 #   and then presume that the `.aclose()` call will | ||||
|                 #   raise a `trio.Cancelled` and just don't call it | ||||
|                 #   in those cases.. | ||||
|                 # | ||||
|                 # that latter approach is more logic, LOC, and more | ||||
|                 # convoluted so for now stick with the first | ||||
|                 # psuedo-hack-workaround where we just try to avoid | ||||
|                 # the shielded call as much as we can detect from | ||||
|                 # the memchan's `._closed` state.. | ||||
|                 # | ||||
|                 # XXX MOTIVATION XXX-> we generally want to raise | ||||
|                 # any underlying actor-runtime/internals error that | ||||
|                 # surfaces from a bug in tractor itself so it can | ||||
|                 # be easily detected/fixed AND, we also want to | ||||
|                 # minimize noisy runtime tracebacks (normally due | ||||
|                 # to the cross-actor linked task scope machinery | ||||
|                 # teardown) displayed to user-code and instead only | ||||
|                 # displaying `ContextCancelled` traces where the | ||||
|                 # cause of crash/exit IS due to something in | ||||
|                 # user/app code on either end of the context. | ||||
|                 and not rxchan._closed | ||||
|             ): | ||||
|                 # XXX NOTE XXX: and again as per above, we mask any | ||||
|                 # `trio.Cancelled` raised here so as to NOT mask | ||||
|                 # out any exception group or legit (remote) ctx | ||||
|                 # error that sourced from the remote task or its | ||||
|                 # runtime. | ||||
|                 # | ||||
|                 # NOTE: further, this should be the only place the | ||||
|                 # underlying feeder channel is | ||||
|                 # once-and-only-CLOSED! | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await ctx._recv_chan.aclose() | ||||
| 
 | ||||
|             # XXX: we always raise remote errors locally and | ||||
|             # generally speaking mask runtime-machinery related | ||||
|             # multi-`trio.Cancelled`s. As such, any `scope_error` | ||||
|             # which was the underlying cause of this context's exit | ||||
|             # should be stored as the `Context._local_error` and | ||||
|             # used in determining `Context.cancelled_caught: bool`. | ||||
|             if scope_err is not None: | ||||
|                 # sanity, tho can remove? | ||||
|                 assert ctx._local_error is scope_err | ||||
|                 # ctx._local_error: BaseException = scope_err | ||||
|                 # etype: Type[BaseException] = type(scope_err) | ||||
| 
 | ||||
|                 # CASE 2 | ||||
|                 if ( | ||||
|                     ctx._cancel_called | ||||
|                     and ctx.cancel_acked | ||||
|                 ): | ||||
|                     log.cancel( | ||||
|                         'Context cancelled by caller task\n' | ||||
|                         f'|_{ctx._task}\n\n' | ||||
| 
 | ||||
|                         f'{repr(scope_err)}\n' | ||||
|                     ) | ||||
| 
 | ||||
|                 # TODO: should we add a `._cancel_req_received` | ||||
|                 # flag to determine if the callee manually called | ||||
|                 # `ctx.cancel()`? | ||||
|                 # -[ ] going to need a cid check no? | ||||
| 
 | ||||
|                 # CASE 1 | ||||
|                 else: | ||||
|                     outcome_str: str = ctx.repr_outcome( | ||||
|                         show_error_fields=True, | ||||
|                         # type_only=True, | ||||
|                     ) | ||||
|                     log.cancel( | ||||
|                         f'Context terminated due to local scope error:\n\n' | ||||
|                         f'{ctx.chan.uid} => {outcome_str}\n' | ||||
|                     ) | ||||
| 
 | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|             # ``Actor._push_result()`` the msg will be discarded and in | ||||
|             # the case where that msg is global debugger unlock (via | ||||
|             # a "stop" msg for a stream), this can result in a deadlock | ||||
|             # where the root is waiting on the lock to clear but the | ||||
|             # child has already cleared it and clobbered IPC. | ||||
| 
 | ||||
|             # FINALLY, remove the context from runtime tracking and | ||||
|             # exit! | ||||
|             log.runtime( | ||||
|                 'Removing IPC ctx opened with peer\n' | ||||
|                 f'{uid}\n' | ||||
|                 f'|_{ctx}\n' | ||||
|             ) | ||||
|             self.actor._contexts.pop( | ||||
|                 (uid, cid), | ||||
|                 None, | ||||
|             ) | ||||
|     # NOTE: impl is found in `._context`` mod to make | ||||
|     # reading/groking the details simpler code-org-wise. This | ||||
|     # method does not have to be used over that `@acm` module func | ||||
|     # directly, it is for conventience and from the original API | ||||
|     # design. | ||||
|     open_context = open_context_from_portal | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
|  | @ -1018,7 +472,7 @@ class LocalPortal: | |||
|         return await func(**kwargs) | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| @acm | ||||
| async def open_portal( | ||||
| 
 | ||||
|     channel: Channel, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue