forked from goodboy/tractor
				
			Get mega-pedantic in `Portal.open_context()`
Specifically in the `.__aexit__()` phase to ensure remote, runtime-internal, and locally raised error-during-cancelled-handling exceptions are NEVER masked by a local `ContextCancelled` or any exception group of `trio.Cancelled`s. Also adds a ton of details to doc strings including extreme detail surrounding the `ContextCancelled` raising cases and their processing inside `.open_context()`'s exception handler blocks. Details, details: - internal rename `err`/`_err` stuff to just be `scope_err` since it's effectively the error bubbled up from the context's surrounding (and cross-actor) "scope". - always shield `._recv_chan.aclose()` to avoid any `Cancelled` from masking the `scope_err` with a runtime related `trio.Cancelled`. - explicitly catch the specific set of `scope_err: BaseException` that we can reasonably expect to handle instead of the catch-all parent type including exception groups, cancels and KBIs.remotes/1757153874605917753/main
							parent
							
								
									89ed8b67ff
								
							
						
					
					
						commit
						22e4b324b1
					
				|  | @ -15,8 +15,12 @@ | |||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| Memory boundary "Portals": an API for structured | ||||
| concurrency linked tasks running in disparate memory domains. | ||||
| Memory "portal" contruct. | ||||
| 
 | ||||
| "Memory portals" are both an API and set of IPC wrapping primitives | ||||
| for managing structured concurrency "cancel-scope linked" tasks | ||||
| running in disparate virtual memory domains - at least in different | ||||
| OS processes, possibly on different (hardware) hosts. | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
|  | @ -66,20 +70,21 @@ def _unwrap_msg( | |||
|         raise unpack_error(msg, channel) from None | ||||
| 
 | ||||
| 
 | ||||
| # TODO: maybe move this to ._exceptions? | ||||
| class MessagingError(Exception): | ||||
|     'Some kind of unexpected SC messaging dialog issue' | ||||
| 
 | ||||
| 
 | ||||
| class Portal: | ||||
|     ''' | ||||
|     A 'portal' to a(n) (remote) ``Actor``. | ||||
|     A 'portal' to a memory-domain-separated `Actor`. | ||||
| 
 | ||||
|     A portal is "opened" (and eventually closed) by one side of an | ||||
|     inter-actor communication context. The side which opens the portal | ||||
|     is equivalent to a "caller" in function parlance and usually is | ||||
|     either the called actor's parent (in process tree hierarchy terms) | ||||
|     or a client interested in scheduling work to be done remotely in a | ||||
|     far process. | ||||
|     process which has a separate (virtual) memory domain. | ||||
| 
 | ||||
|     The portal api allows the "caller" actor to invoke remote routines | ||||
|     and receive results through an underlying ``tractor.Channel`` as | ||||
|  | @ -89,9 +94,9 @@ class Portal: | |||
|     like having a "portal" between the seperate actor memory spaces. | ||||
| 
 | ||||
|     ''' | ||||
|     # the timeout for a remote cancel request sent to | ||||
|     # a(n) (peer) actor. | ||||
|     cancel_timeout = 0.5 | ||||
|     # global timeout for remote cancel requests sent to | ||||
|     # connected (peer) actors. | ||||
|     cancel_timeout: float = 0.5 | ||||
| 
 | ||||
|     def __init__(self, channel: Channel) -> None: | ||||
|         self.channel = channel | ||||
|  | @ -393,12 +398,32 @@ class Portal: | |||
| 
 | ||||
|     ) -> AsyncGenerator[tuple[Context, Any], None]: | ||||
|         ''' | ||||
|         Open an inter-actor task context. | ||||
|         Open an inter-actor "task context"; a remote task is | ||||
|         scheduled and cancel-scope-state-linked to a `trio.run()` across | ||||
|         memory boundaries in another actor's runtime. | ||||
| 
 | ||||
|         This is a synchronous API which allows for deterministic | ||||
|         setup/teardown of a remote task. The yielded ``Context`` further | ||||
|         allows for opening bidirectional streams, explicit cancellation | ||||
|         and synchronized final result collection. See ``tractor.Context``. | ||||
|         This is an `@acm` API which allows for deterministic setup | ||||
|         and teardown of a remotely scheduled task in another remote | ||||
|         actor. Once opened, the 2 now "linked" tasks run completely | ||||
|         in parallel in each actor's runtime with their enclosing | ||||
|         `trio.CancelScope`s kept in a synced state wherein if | ||||
|         either side errors or cancels an equivalent error is | ||||
|         relayed to the other side via an SC-compat IPC protocol. | ||||
| 
 | ||||
|         The yielded `tuple` is a pair delivering a `tractor.Context` | ||||
|         and any first value "sent" by the "callee" task via a call | ||||
|         to `Context.started(<value: Any>)`; this side of the | ||||
|         context does not unblock until the "callee" task calls | ||||
|         `.started()` in similar style to `trio.Nursery.start()`. | ||||
|         When the "callee" (side that is "called"/started by a call | ||||
|         to *this* method) returns, the caller side (this) unblocks | ||||
|         and any final value delivered from the other end can be | ||||
|         retrieved using the `Contex.result()` api. | ||||
| 
 | ||||
|         The yielded ``Context`` instance further allows for opening | ||||
|         bidirectional streams, explicit cancellation and | ||||
|         structurred-concurrency-synchronized final result-msg | ||||
|         collection. See ``tractor.Context`` for more details. | ||||
| 
 | ||||
|         ''' | ||||
|         # conduct target func method structural checks | ||||
|  | @ -431,47 +456,52 @@ class Portal: | |||
|         ) | ||||
| 
 | ||||
|         assert ctx._remote_func_type == 'context' | ||||
|         msg = await ctx._recv_chan.receive() | ||||
|         msg: dict = await ctx._recv_chan.receive() | ||||
| 
 | ||||
|         try: | ||||
|             # the "first" value here is delivered by the callee's | ||||
|             # ``Context.started()`` call. | ||||
|             first = msg['started'] | ||||
|             ctx._started_called = True | ||||
|             ctx._started_called: bool = True | ||||
| 
 | ||||
|         except KeyError: | ||||
|             assert msg.get('cid'), ("Received internal error at context?") | ||||
|             if not (cid := msg.get('cid')): | ||||
|                 raise MessagingError( | ||||
|                     'Received internal error at context?\n' | ||||
|                     'No call-id (cid) in startup msg?' | ||||
|                 ) | ||||
| 
 | ||||
|             if msg.get('error'): | ||||
|                 # raise kerr from unpack_error(msg, self.channel) | ||||
|                 # NOTE: mask the key error with the remote one | ||||
|                 raise unpack_error(msg, self.channel) from None | ||||
|             else: | ||||
|                 raise MessagingError( | ||||
|                     f'Context for {ctx.cid} was expecting a `started` message' | ||||
|                     f' but received a non-error msg:\n{pformat(msg)}' | ||||
|                     f'Context for {cid} was expecting a `started` message' | ||||
|                     ' but received a non-error msg:\n' | ||||
|                     f'{pformat(msg)}' | ||||
|                 ) | ||||
| 
 | ||||
|         _err: BaseException | None = None | ||||
|         ctx._portal: Portal = self | ||||
| 
 | ||||
|         uid: tuple = self.channel.uid | ||||
|         cid: str = ctx.cid | ||||
|         etype: Type[BaseException] | None = None | ||||
| 
 | ||||
|         # deliver context instance and .started() msg value in enter | ||||
|         # tuple. | ||||
|         # placeholder for any exception raised in the runtime | ||||
|         # or by user tasks which cause this context's closure. | ||||
|         scope_err: BaseException | None = None | ||||
|         try: | ||||
|             async with trio.open_nursery() as nurse: | ||||
|                 ctx._scope_nursery = nurse | ||||
|                 ctx._scope = nurse.cancel_scope | ||||
|                 ctx._scope_nursery: trio.Nursery = nurse | ||||
|                 ctx._scope: trio.CancelScope = nurse.cancel_scope | ||||
| 
 | ||||
|                 # deliver context instance and .started() msg value | ||||
|                 # in enter tuple. | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|                 # when in allow_ovveruns mode there may be lingering | ||||
|                 # overflow sender tasks remaining? | ||||
|                 # when in allow_overruns mode there may be | ||||
|                 # lingering overflow sender tasks remaining? | ||||
|                 if nurse.child_tasks: | ||||
|                     # ensure we are in overrun state with | ||||
|                     # ``._allow_overruns=True`` bc otherwise | ||||
|                     # XXX: ensure we are in overrun state | ||||
|                     # with ``._allow_overruns=True`` bc otherwise | ||||
|                     # there should be no tasks in this nursery! | ||||
|                     if ( | ||||
|                         not ctx._allow_overruns | ||||
|  | @ -479,47 +509,72 @@ class Portal: | |||
|                     ): | ||||
|                         raise RuntimeError( | ||||
|                             'Context has sub-tasks but is ' | ||||
|                             'not in `allow_overruns=True` Mode!?' | ||||
|                             'not in `allow_overruns=True` mode!?' | ||||
|                         ) | ||||
| 
 | ||||
|                     # ensure cancel of all overflow sender tasks | ||||
|                     # started in the ctx nursery. | ||||
|                     ctx._scope.cancel() | ||||
| 
 | ||||
|         except ContextCancelled as err: | ||||
|             _err = err | ||||
|         # XXX: (maybe) shield/mask context-cancellations that were | ||||
|         # initiated by any of the context's 2 tasks. There are | ||||
|         # subsequently 2 operating cases for a "graceful cancel" | ||||
|         # of a `Context`: | ||||
|         #  | ||||
|         # 1.*this* side's task called `Context.cancel()`, in | ||||
|         #   which case we mask the `ContextCancelled` from bubbling | ||||
|         #   to the opener (much like how `trio.Nursery` swallows | ||||
|         #   any `trio.Cancelled` bubbled by a call to | ||||
|         #   `Nursery.cancel_scope.cancel()`) | ||||
|         # | ||||
|         # 2.*the other* side's (callee/spawned) task cancelled due | ||||
|         #   to a self or peer cancellation request in which case we | ||||
|         #   DO let the error bubble to the opener. | ||||
|         except ContextCancelled as ctxc: | ||||
|             scope_err = ctxc | ||||
| 
 | ||||
|             # swallow and mask cross-actor task context cancels that | ||||
|             # were initiated by *this* side's task. | ||||
|             # CASE 1: this context was never cancelled | ||||
|             # via a local task's call to `Context.cancel()`. | ||||
|             if not ctx._cancel_called: | ||||
|                 # XXX: this should NEVER happen! | ||||
|                 # from ._debug import breakpoint | ||||
|                 # await breakpoint() | ||||
|                 raise | ||||
| 
 | ||||
|             # if the context was cancelled by client code | ||||
|             # then we don't need to raise since user code | ||||
|             # is expecting this and the block should exit. | ||||
|             # CASE 2: context was cancelled by local task calling | ||||
|             # `.cancel()`, we don't raise and the exit block should | ||||
|             # exit silently. | ||||
|             else: | ||||
|                 log.debug(f'Context {ctx} cancelled gracefully') | ||||
|                 log.debug( | ||||
|                     f'Context {ctx} cancelled gracefully with:\n' | ||||
|                     f'{ctxc}' | ||||
|                 ) | ||||
| 
 | ||||
|         except ( | ||||
|             BaseException, | ||||
|             # - a standard error in the caller/yieldee | ||||
|             Exception, | ||||
| 
 | ||||
|             # more specifically, we need to handle these but not | ||||
|             # sure it's worth being pedantic: | ||||
|             # Exception, | ||||
|             # trio.Cancelled, | ||||
|             # KeyboardInterrupt, | ||||
|             # - a runtime teardown exception-group and/or | ||||
|             #   cancellation request from a caller task. | ||||
|             BaseExceptionGroup, | ||||
|             trio.Cancelled, | ||||
|             KeyboardInterrupt, | ||||
| 
 | ||||
|         ) as err: | ||||
|             etype = type(err) | ||||
|             scope_err = err | ||||
| 
 | ||||
|             # cancel ourselves on any error. | ||||
|             # XXX: request cancel of this context on any error. | ||||
|             # NOTE: `Context.cancel()` is conversely NOT called in | ||||
|             # the `ContextCancelled` "cancellation requested" case | ||||
|             # above. | ||||
|             log.cancel( | ||||
|                 'Context cancelled for task, sending cancel request..\n' | ||||
|                 'Context cancelled for task due to\n' | ||||
|                 f'{err}\n' | ||||
|                 'Sending cancel request..\n' | ||||
|                 f'task:{cid}\n' | ||||
|                 f'actor:{uid}' | ||||
|             ) | ||||
|             try: | ||||
| 
 | ||||
|                 await ctx.cancel() | ||||
|             except trio.BrokenResourceError: | ||||
|                 log.warning( | ||||
|  | @ -528,8 +583,9 @@ class Portal: | |||
|                     f'actor:{uid}' | ||||
|                 ) | ||||
| 
 | ||||
|             raise | ||||
|             raise  # duh | ||||
| 
 | ||||
|         # no scope error case | ||||
|         else: | ||||
|             if ctx.chan.connected(): | ||||
|                 log.info( | ||||
|  | @ -537,10 +593,20 @@ class Portal: | |||
|                     f'task: {cid}\n' | ||||
|                     f'actor: {uid}' | ||||
|                 ) | ||||
|                 # XXX NOTE XXX: the below call to | ||||
|                 # `Context.result()` will ALWAYS raise | ||||
|                 # a `ContextCancelled` (via an embedded call to | ||||
|                 # `Context._maybe_raise_remote_err()`) IFF | ||||
|                 # a `Context._remote_error` was set by the runtime | ||||
|                 # via a call to | ||||
|                 # `Context._maybe_cancel_and_set_remote_error()` | ||||
|                 # which IS SET any time the far end fails and | ||||
|                 # causes "caller side" cancellation via | ||||
|                 # a `ContextCancelled` here. | ||||
|                 result = await ctx.result() | ||||
|                 log.runtime( | ||||
|                     f'Context {fn_name} returned ' | ||||
|                     f'value from callee `{result}`' | ||||
|                     f'Context {fn_name} returned value from callee:\n' | ||||
|                     f'`{result}`' | ||||
|                 ) | ||||
| 
 | ||||
|         finally: | ||||
|  | @ -548,22 +614,73 @@ class Portal: | |||
|             # operating *in* this scope to have survived | ||||
|             # we tear down the runtime feeder chan last | ||||
|             # to avoid premature stream clobbers. | ||||
|             if ctx._recv_chan is not None: | ||||
|                 # should we encapsulate this in the context api? | ||||
|                 await ctx._recv_chan.aclose() | ||||
|             rxchan: trio.ReceiveChannel = ctx._recv_chan | ||||
|             if ( | ||||
|                  rxchan | ||||
| 
 | ||||
|             if etype: | ||||
|                 # maybe TODO: yes i know the below check is | ||||
|                 # touching `trio` memchan internals..BUT, there are | ||||
|                 # only a couple ways to avoid a `trio.Cancelled` | ||||
|                 # bubbling from the `.aclose()` call below: | ||||
|                 # | ||||
|                 # - catch and mask it via the cancel-scope-shielded call | ||||
|                 #   as we are rn (manual and frowned upon) OR, | ||||
|                 # - specially handle the case where `scope_err` is | ||||
|                 #   one of {`BaseExceptionGroup`, `trio.Cancelled`} | ||||
|                 #   and then presume that the `.aclose()` call will | ||||
|                 #   raise a `trio.Cancelled` and just don't call it | ||||
|                 #   in those cases.. | ||||
|                 # | ||||
|                 # that latter approach is more logic, LOC, and more | ||||
|                 # convoluted so for now stick with the first | ||||
|                 # psuedo-hack-workaround where we just try to avoid | ||||
|                 # the shielded call as much as we can detect from | ||||
|                 # the memchan's `._closed` state.. | ||||
|                 # | ||||
|                 # XXX MOTIVATION XXX-> we generally want to raise | ||||
|                 # any underlying actor-runtime/internals error that | ||||
|                 # surfaces from a bug in tractor itself so it can | ||||
|                 # be easily detected/fixed AND, we also want to | ||||
|                 # minimize noisy runtime tracebacks (normally due | ||||
|                 # to the cross-actor linked task scope machinery | ||||
|                 # teardown) displayed to user-code and instead only | ||||
|                 # displaying `ContextCancelled` traces where the | ||||
|                 # cause of crash/exit IS due to something in | ||||
|                 # user/app code on either end of the context. | ||||
|                 and not rxchan._closed | ||||
|             ): | ||||
|                 # XXX NOTE XXX: and again as per above, we mask any | ||||
|                 # `trio.Cancelled` raised here so as to NOT mask | ||||
|                 # out any exception group or legit (remote) ctx | ||||
|                 # error that sourced from the remote task or its | ||||
|                 # runtime. | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await ctx._recv_chan.aclose() | ||||
| 
 | ||||
|             # XXX: since we always (maybe) re-raise (and thus also | ||||
|             # mask runtime machinery related | ||||
|             # multi-`trio.Cancelled`s) any scope error which was | ||||
|             # the underlying cause of this context's exit, add | ||||
|             # different log msgs for each of the (2) cases. | ||||
|             if scope_err is not None: | ||||
|                 etype: Type[BaseException] = type(scope_err) | ||||
| 
 | ||||
|                 # CASE 2 | ||||
|                 if ctx._cancel_called: | ||||
|                     log.cancel( | ||||
|                         f'Context {fn_name} cancelled by caller with\n{etype}' | ||||
|                         f'Context {fn_name} cancelled by caller with\n' | ||||
|                         f'{etype}' | ||||
|                     ) | ||||
|                 elif _err is not None: | ||||
| 
 | ||||
|                 # CASE 1 | ||||
|                 else: | ||||
|                     log.cancel( | ||||
|                         f'Context for task cancelled by callee with {etype}\n' | ||||
|                         f'Context cancelled by callee with {etype}\n' | ||||
|                         f'target: `{fn_name}`\n' | ||||
|                         f'task:{cid}\n' | ||||
|                         f'actor:{uid}' | ||||
|                     ) | ||||
| 
 | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|  | @ -572,10 +689,9 @@ class Portal: | |||
|             # a "stop" msg for a stream), this can result in a deadlock | ||||
|             # where the root is waiting on the lock to clear but the | ||||
|             # child has already cleared it and clobbered IPC. | ||||
|             from ._debug import maybe_wait_for_debugger | ||||
|             await maybe_wait_for_debugger() | ||||
| 
 | ||||
|             # remove the context from runtime tracking | ||||
|             # FINALLY, remove the context from runtime tracking and | ||||
|             # exit Bo | ||||
|             self.actor._contexts.pop( | ||||
|                 (self.channel.uid, ctx.cid), | ||||
|                 None, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue