forked from goodboy/tractor
				
			Be ultra-correct in `Portal.open_context()`
This took way too long to get right but hopefully will give us grok-able and correct context exit semantics going forward B) The main fixes were: - always shielding the `MsgStream.aclose()` call on teardown to avoid bubbling a `Cancelled`. - properly absorbing any `ContextCancelled` in cases due to "self cancellation" using the new `Context.canceller` in the logic. - capturing any error raised by the `Context.result()` call in the "normal exit, result received" case and setting it as the `Context._local_error` so that self-cancels can be easily measured via `Context.cancelled_caught` in same way as remote-error caused cancellations. - extremely detailed comments around all of the cancellation-error cases to avoid ever getting confused about the control flow in the future XDremotes/1757153874605917753/main
							parent
							
								
									f5fcd8ca2e
								
							
						
					
					
						commit
						7d5fda4485
					
				|  | @ -48,6 +48,7 @@ from ._exceptions import ( | |||
|     unpack_error, | ||||
|     NoResult, | ||||
|     ContextCancelled, | ||||
|     MessagingError, | ||||
| ) | ||||
| from ._context import Context | ||||
| from ._streaming import MsgStream | ||||
|  | @ -70,11 +71,6 @@ def _unwrap_msg( | |||
|         raise unpack_error(msg, channel) from None | ||||
| 
 | ||||
| 
 | ||||
| # TODO: maybe move this to ._exceptions? | ||||
| class MessagingError(Exception): | ||||
|     'Some kind of unexpected SC messaging dialog issue' | ||||
| 
 | ||||
| 
 | ||||
| class Portal: | ||||
|     ''' | ||||
|     A 'portal' to a memory-domain-separated `Actor`. | ||||
|  | @ -219,14 +215,18 @@ class Portal: | |||
| 
 | ||||
|         try: | ||||
|             # send cancel cmd - might not get response | ||||
|             # XXX: sure would be nice to make this work with a proper shield | ||||
|             # XXX: sure would be nice to make this work with | ||||
|             # a proper shield | ||||
|             with trio.move_on_after( | ||||
|                 timeout | ||||
|                 or self.cancel_timeout | ||||
|             ) as cs: | ||||
|                 cs.shield = True | ||||
| 
 | ||||
|                 await self.run_from_ns('self', 'cancel') | ||||
|                 await self.run_from_ns( | ||||
|                     'self', | ||||
|                     'cancel', | ||||
|                 ) | ||||
|                 return True | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
|  | @ -461,10 +461,14 @@ class Portal: | |||
|         try: | ||||
|             # the "first" value here is delivered by the callee's | ||||
|             # ``Context.started()`` call. | ||||
|             first = msg['started'] | ||||
|             first: Any = msg['started'] | ||||
|             ctx._started_called: bool = True | ||||
| 
 | ||||
|         except KeyError: | ||||
| 
 | ||||
|             # TODO: can we maybe factor this into the new raiser | ||||
|             # `_streaming._raise_from_no_yield_msg()` and make that | ||||
|             # helper more generic, say with a `_no_<blah>_msg()`? | ||||
|             if not (cid := msg.get('cid')): | ||||
|                 raise MessagingError( | ||||
|                     'Received internal error at context?\n' | ||||
|  | @ -516,57 +520,102 @@ class Portal: | |||
|                     # started in the ctx nursery. | ||||
|                     ctx._scope.cancel() | ||||
| 
 | ||||
|         # XXX: (maybe) shield/mask context-cancellations that were | ||||
|         # initiated by any of the context's 2 tasks. There are | ||||
|         # subsequently 2 operating cases for a "graceful cancel" | ||||
|         # of a `Context`: | ||||
|         #  | ||||
|         # 1.*this* side's task called `Context.cancel()`, in | ||||
|         #   which case we mask the `ContextCancelled` from bubbling | ||||
|         #   to the opener (much like how `trio.Nursery` swallows | ||||
|         #   any `trio.Cancelled` bubbled by a call to | ||||
|         #   `Nursery.cancel_scope.cancel()`) | ||||
|         # XXX NOTE XXX: maybe shield against | ||||
|         # self-context-cancellation (which raises a local | ||||
|         # `ContextCancelled`) when requested (via | ||||
|         # `Context.cancel()`) by the same task (tree) which entered | ||||
|         # THIS `.open_context()`. | ||||
|         # | ||||
|         # 2.*the other* side's (callee/spawned) task cancelled due | ||||
|         #   to a self or peer cancellation request in which case we | ||||
|         #   DO let the error bubble to the opener. | ||||
|         # NOTE: There are 2 operating cases for a "graceful cancel" | ||||
|         # of a `Context`. In both cases any `ContextCancelled` | ||||
|         # raised in this scope-block came from a transport msg | ||||
|         # relayed from some remote-actor-task which our runtime set | ||||
|         # as a `Context._remote_error` | ||||
|         # | ||||
|         # the CASES: | ||||
|         # | ||||
|         # - if that context IS THE SAME ONE that called | ||||
|         #   `Context.cancel()`, we want to absorb the error | ||||
|         #   silently and let this `.open_context()` block to exit | ||||
|         #   without raising. | ||||
|         # | ||||
|         # - if it is from some OTHER context (we did NOT call | ||||
|         #   `.cancel()`), we want to re-RAISE IT whilst also | ||||
|         #   setting our own ctx's "reason for cancel" to be that | ||||
|         #   other context's cancellation condition; we set our | ||||
|         #   `.canceller: tuple[str, str]` to be same value as | ||||
|         #   caught here in a `ContextCancelled.canceller`. | ||||
|         # | ||||
|         # Again, there are 2 cases: | ||||
|         # | ||||
|         # 1-some other context opened in this `.open_context()` | ||||
|         #   block cancelled due to a self or peer cancellation | ||||
|         #   request in which case we DO let the error bubble to the | ||||
|         #   opener. | ||||
|         # | ||||
|         # 2-THIS "caller" task somewhere invoked `Context.cancel()` | ||||
|         #   and received a `ContextCanclled` from the "callee" | ||||
|         #   task, in which case we mask the `ContextCancelled` from | ||||
|         #   bubbling to this "caller" (much like how `trio.Nursery` | ||||
|         #   swallows any `trio.Cancelled` bubbled by a call to | ||||
|         #   `Nursery.cancel_scope.cancel()`) | ||||
|         except ContextCancelled as ctxc: | ||||
|             scope_err = ctxc | ||||
| 
 | ||||
|             # CASE 1: this context was never cancelled | ||||
|             # via a local task's call to `Context.cancel()`. | ||||
|             if not ctx._cancel_called: | ||||
|                 # XXX: this should NEVER happen! | ||||
|                 # from ._debug import breakpoint | ||||
|                 # await breakpoint() | ||||
|                 raise | ||||
| 
 | ||||
|             # CASE 2: context was cancelled by local task calling | ||||
|             # `.cancel()`, we don't raise and the exit block should | ||||
|             # exit silently. | ||||
|             else: | ||||
|             if ( | ||||
|                 ctx._cancel_called | ||||
|                 and ( | ||||
|                     ctxc is ctx._remote_error | ||||
|                     or | ||||
|                     ctxc.canceller is self.canceller | ||||
|                 ) | ||||
|             ): | ||||
|                 log.debug( | ||||
|                     f'Context {ctx} cancelled gracefully with:\n' | ||||
|                     f'{ctxc}' | ||||
|                 ) | ||||
|             # CASE 1: this context was never cancelled via a local | ||||
|             # task (tree) having called `Context.cancel()`, raise | ||||
|             # the error since it was caused by someone else! | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         # the above `._scope` can be cancelled due to: | ||||
|         # 1. an explicit self cancel via `Context.cancel()` or | ||||
|         #    `Actor.cancel()`, | ||||
|         # 2. any "callee"-side remote error, possibly also a cancellation | ||||
|         #    request by some peer, | ||||
|         # 3. any "caller" (aka THIS scope's) local error raised in the above `yield` | ||||
|         except ( | ||||
|             # - a standard error in the caller/yieldee | ||||
|             # CASE 3: standard local error in this caller/yieldee | ||||
|             Exception, | ||||
| 
 | ||||
|             # - a runtime teardown exception-group and/or | ||||
|             #   cancellation request from a caller task. | ||||
|             BaseExceptionGroup, | ||||
|             trio.Cancelled, | ||||
|             # CASES 1 & 2: normally manifested as | ||||
|             # a `Context._scope_nursery` raised | ||||
|             # exception-group of, | ||||
|             # 1.-`trio.Cancelled`s, since | ||||
|             #   `._scope.cancel()` will have been called and any | ||||
|             #   `ContextCancelled` absorbed and thus NOT RAISED in | ||||
|             #   any `Context._maybe_raise_remote_err()`, | ||||
|             # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` | ||||
|             #    from any error raised in the "callee" side with | ||||
|             #    a group only raised if there was any more then one | ||||
|             #    task started here in the "caller" in the | ||||
|             #    `yield`-ed to task. | ||||
|             BaseExceptionGroup,  # since overrun handler tasks may have been spawned | ||||
|             trio.Cancelled,  # NOTE: NOT from inside the ctx._scope | ||||
|             KeyboardInterrupt, | ||||
| 
 | ||||
|         ) as err: | ||||
|             scope_err = err | ||||
| 
 | ||||
|             # XXX: request cancel of this context on any error. | ||||
|             # NOTE: `Context.cancel()` is conversely NOT called in | ||||
|             # the `ContextCancelled` "cancellation requested" case | ||||
|             # above. | ||||
|             # XXX: ALWAYS request the context to CANCEL ON any ERROR. | ||||
|             # NOTE: `Context.cancel()` is conversely NEVER CALLED in | ||||
|             # the `ContextCancelled` "self cancellation absorbed" case | ||||
|             # handled in the block above! | ||||
|             log.cancel( | ||||
|                 'Context cancelled for task due to\n' | ||||
|                 f'{err}\n' | ||||
|  | @ -585,7 +634,7 @@ class Portal: | |||
| 
 | ||||
|             raise  # duh | ||||
| 
 | ||||
|         # no scope error case | ||||
|         # no local scope error, the "clean exit with a result" case. | ||||
|         else: | ||||
|             if ctx.chan.connected(): | ||||
|                 log.info( | ||||
|  | @ -599,15 +648,27 @@ class Portal: | |||
|                 # `Context._maybe_raise_remote_err()`) IFF | ||||
|                 # a `Context._remote_error` was set by the runtime | ||||
|                 # via a call to | ||||
|                 # `Context._maybe_cancel_and_set_remote_error()` | ||||
|                 # which IS SET any time the far end fails and | ||||
|                 # causes "caller side" cancellation via | ||||
|                 # a `ContextCancelled` here. | ||||
|                 result = await ctx.result() | ||||
|                 log.runtime( | ||||
|                     f'Context {fn_name} returned value from callee:\n' | ||||
|                     f'`{result}`' | ||||
|                 ) | ||||
|                 # `Context._maybe_cancel_and_set_remote_error()`. | ||||
|                 # As per `Context._deliver_msg()`, that error IS | ||||
|                 # ALWAYS SET any time "callee" side fails and causes "caller | ||||
|                 # side" cancellation via a `ContextCancelled` here. | ||||
|                 # result = await ctx.result() | ||||
|                 try: | ||||
|                     result = await ctx.result() | ||||
|                     log.runtime( | ||||
|                         f'Context {fn_name} returned value from callee:\n' | ||||
|                         f'`{result}`' | ||||
|                     ) | ||||
|                 except BaseException as berr: | ||||
|                     # on normal teardown, if we get some error | ||||
|                     # raised in `Context.result()` we still want to | ||||
|                     # save that error on the ctx's state to | ||||
|                     # determine things like `.cancelled_caught` for | ||||
|                     # cases where there was remote cancellation but | ||||
|                     # this task didn't know until final teardown | ||||
|                     # / value collection. | ||||
|                     scope_err = berr | ||||
|                     raise | ||||
| 
 | ||||
|         finally: | ||||
|             # though it should be impossible for any tasks | ||||
|  | @ -657,12 +718,14 @@ class Portal: | |||
|                 with trio.CancelScope(shield=True): | ||||
|                     await ctx._recv_chan.aclose() | ||||
| 
 | ||||
|             # XXX: since we always (maybe) re-raise (and thus also | ||||
|             # mask runtime machinery related | ||||
|             # multi-`trio.Cancelled`s) any scope error which was | ||||
|             # the underlying cause of this context's exit, add | ||||
|             # different log msgs for each of the (2) cases. | ||||
|             # XXX: we always raise remote errors locally and | ||||
|             # generally speaking mask runtime-machinery related | ||||
|             # multi-`trio.Cancelled`s. As such, any `scope_error` | ||||
|             # which was the underlying cause of this context's exit | ||||
|             # should be stored as the `Context._local_error` and | ||||
|             # used in determining `Context.cancelled_caught: bool`. | ||||
|             if scope_err is not None: | ||||
|                 ctx._local_error: BaseException = scope_err | ||||
|                 etype: Type[BaseException] = type(scope_err) | ||||
| 
 | ||||
|                 # CASE 2 | ||||
|  | @ -691,7 +754,7 @@ class Portal: | |||
|             # child has already cleared it and clobbered IPC. | ||||
| 
 | ||||
|             # FINALLY, remove the context from runtime tracking and | ||||
|             # exit Bo | ||||
|             # exit! | ||||
|             self.actor._contexts.pop( | ||||
|                 (self.channel.uid, ctx.cid), | ||||
|                 None, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue