forked from goodboy/tractor
				
			Be mega-pedantic with `ContextCancelled` semantics
As part of extremely detailed inter-peer-actor testing, add much more granular `Context` cancellation state tracking via the following (new) fields: - `.canceller: tuple[str, str]` the uuid of the actor responsible for the cancellation condition - always set by `Context._maybe_cancel_and_set_remote_error()` and replaces `._cancelled_remote` and `.cancel_called_remote`. If set, this value should normally always match a value from some `ContextCancelled` raised or caught by one side of the context. - `._local_error` which is always set to the locally raised (and caller or callee task's scope-internal) error which caused any eventual cancellation/error condition and thus any closure of the context's per-task-side-`trio.Nursery`. - `.cancelled_caught: bool` is now always `True` whenever the local task catches (or "silently absorbs") a `ContextCancelled` (a `ctxc`) that indeed originated from one of the context's linked tasks or any other context which raised its own `ctxc` in the current `.open_context()` scope. => whenever there is a case that no `ContextCancelled` was raised **in** the `.open_context().__aexit__()` (eg. `ctx.result()` called after a call `ctx.cancel()`), we still consider the context's as having "caught a cancellation" since the `ctxc` was indeed silently handled by the cancel requester; all other error cases are already represented by mirroring the state of the `._scope: trio.CancelScope` => IOW there should be **no case** where an error is **not raised** in the context's scope and `.cancelled_caught: bool == False`, i.e. no case where `._scope.cancelled_caught == False and ._local_error is not None`! - always raise any `ctxc` from `.open_stream()` if `._cancel_called == True` - if the cancellation request has not already resulted in a `._remote_error: ContextCancelled` we raise a `RuntimeError` to indicate improper usage to the guilty side's task code. - make `._maybe_raise_remote_err()` a sync func and don't raise any `ctxc` which is matched against a `.canceller` determined to be the current actor, aka a "self cancel", and always set the `._local_error` to any such `ctxc`. - `.side: str` taken from inside `.cancel()` and unused as of now since it might be better re-written as a similar `.is_opener() -> bool`? - drop unused `._started_received: bool`.. - TONS and TONS of detailed comments/docs to attempt to explain all the possible cancellation/exit cases and how they should exhibit as either silent closes or raises from the `Context` API! Adjust the `._runtime._invoke()` code to match: - use `ctx._maybe_raise_remote_err()` in `._invoke()`. - adjust to new `.canceller` property. - more type hints. - better `log.cancel()` msging around self-cancels vs. peer-cancels. - always set the `._local_error: BaseException` for the "callee" task just like `Portal.open_context()` now will do B) Prior we were raising any `Context._remote_error` directly and doing (more or less) the same `ContextCancelled` "absorbing" logic (well kinda) in block; instead delegate to the methodremotes/1757153874605917753/main
							parent
							
								
									04217f319a
								
							
						
					
					
						commit
						f5fcd8ca2e
					
				|  | @ -56,6 +56,7 @@ from ._state import current_actor | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|     from ._portal import Portal |     from ._portal import Portal | ||||||
|  |     from ._runtime import Actor | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
|  | @ -64,20 +65,26 @@ log = get_logger(__name__) | ||||||
| @dataclass | @dataclass | ||||||
| class Context: | class Context: | ||||||
|     ''' |     ''' | ||||||
|     An inter-actor, ``trio``-task communication context. |     An inter-actor, SC transitive, `trio.Task` communication context. | ||||||
| 
 | 
 | ||||||
|     NB: This class should never be instatiated directly, it is delivered |     NB: This class should **never be instatiated directly**, it is allocated | ||||||
|     by either, |     by the runtime in 2 ways: | ||||||
|      - runtime machinery to a remotely started task or, |      - by entering ``Portal.open_context()`` which is the primary | ||||||
|      - by entering ``Portal.open_context()``. |        public API for any "caller" task or, | ||||||
|  |      - by the RPC machinery's `._runtime._invoke()` as a `ctx` arg | ||||||
|  |        to a remotely scheduled "callee" function. | ||||||
| 
 | 
 | ||||||
|      and is always constructed using ``mkt_context()``. |     AND is always constructed using the below ``mk_context()``. | ||||||
| 
 | 
 | ||||||
|     Allows maintaining task or protocol specific state between |     Allows maintaining task or protocol specific state between | ||||||
|     2 communicating, parallel executing actor tasks. A unique context is |     2 cancel-scope-linked, communicating and parallel executing | ||||||
|     allocated on each side of any task RPC-linked msg dialog, for |     `trio.Task`s. Contexts are allocated on each side of any task | ||||||
|     every request to a remote actor from a portal. On the "callee" |     RPC-linked msg dialog, i.e. for every request to a remote | ||||||
|     side a context is always allocated inside ``._runtime._invoke()``. |     actor from a `Portal`. On the "callee" side a context is | ||||||
|  |     always allocated inside ``._runtime._invoke()``. | ||||||
|  | 
 | ||||||
|  |     # TODO: more detailed writeup on cancellation, error and | ||||||
|  |     # streaming semantics.. | ||||||
| 
 | 
 | ||||||
|     A context can be cancelled and (possibly eventually restarted) from |     A context can be cancelled and (possibly eventually restarted) from | ||||||
|     either side of the underlying IPC channel, it can also open task |     either side of the underlying IPC channel, it can also open task | ||||||
|  | @ -108,12 +115,31 @@ class Context: | ||||||
|     # which is exactly the primitive that allows for |     # which is exactly the primitive that allows for | ||||||
|     # cross-actor-task-supervision and thus SC. |     # cross-actor-task-supervision and thus SC. | ||||||
|     _scope: trio.CancelScope | None = None |     _scope: trio.CancelScope | None = None | ||||||
|  | 
 | ||||||
|  |     # on a clean exit there should be a final value | ||||||
|  |     # delivered from the far end "callee" task, so | ||||||
|  |     # this value is only set on one side. | ||||||
|     _result: Any | int = None |     _result: Any | int = None | ||||||
|  | 
 | ||||||
|  |     # if the local "caller"  task errors this | ||||||
|  |     # value is always set to the error that was | ||||||
|  |     # captured in the `Portal.open_context().__aexit__()` | ||||||
|  |     # teardown. | ||||||
|  |     _local_error: BaseException | None = None | ||||||
|  | 
 | ||||||
|  |     # if the either side gets an error from the other | ||||||
|  |     # this value is set to that error unpacked from an | ||||||
|  |     # IPC msg. | ||||||
|     _remote_error: BaseException | None = None |     _remote_error: BaseException | None = None | ||||||
| 
 | 
 | ||||||
|     # cancellation state |     # only set if the local task called `.cancel()` | ||||||
|     _cancel_called: bool = False  # did WE cancel the far end? |     _cancel_called: bool = False  # did WE cancel the far end? | ||||||
|     _cancelled_remote: tuple[str, str] | None = None | 
 | ||||||
|  |     # TODO: do we even need this? we can assume that if we're | ||||||
|  |     # cancelled that the other side is as well, so maybe we should | ||||||
|  |     # instead just have a `.canceller` pulled from the | ||||||
|  |     # `ContextCancelled`? | ||||||
|  |     _canceller: tuple[str, str] | None = None | ||||||
| 
 | 
 | ||||||
|     # NOTE: we try to ensure assignment of a "cancel msg" since |     # NOTE: we try to ensure assignment of a "cancel msg" since | ||||||
|     # there's always going to be an "underlying reason" that any |     # there's always going to be an "underlying reason" that any | ||||||
|  | @ -145,23 +171,47 @@ class Context: | ||||||
|         return self._cancel_called |         return self._cancel_called | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def cancel_called_remote(self) -> tuple[str, str] | None: |     def canceller(self) -> tuple[str, str] | None: | ||||||
|         ''' |         ''' | ||||||
|         ``Actor.uid`` of the remote actor who's task was cancelled |         ``Actor.uid: tuple[str, str]`` of the (remote) | ||||||
|         causing this side of the context to also be cancelled. |         actor-process who's task was cancelled thus causing this | ||||||
|  |         (side of the) context to also be cancelled. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         remote_uid = self._cancelled_remote |         return self._canceller | ||||||
|         if remote_uid: |  | ||||||
|             return tuple(remote_uid) |  | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def cancelled_caught(self) -> bool: |     def cancelled_caught(self) -> bool: | ||||||
|         return self._scope.cancelled_caught |         return ( | ||||||
|  |             # the local scope was cancelled either by | ||||||
|  |             # remote error or self-request | ||||||
|  |             self._scope.cancelled_caught | ||||||
|  | 
 | ||||||
|  |             # the local scope was never cancelled | ||||||
|  |             # and instead likely we received a remote side | ||||||
|  |             # cancellation that was raised inside `.result()` | ||||||
|  |             or ( | ||||||
|  |                 (se := self._local_error) | ||||||
|  |                 and | ||||||
|  |                 isinstance(se, ContextCancelled) | ||||||
|  |                 and ( | ||||||
|  |                     se.canceller == self.canceller | ||||||
|  |                     or | ||||||
|  |                     se is self._remote_error | ||||||
|  |                 ) | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     @property | ||||||
|  |     def side(self) -> str: | ||||||
|  |         ''' | ||||||
|  |         Return string indicating which task this instance is wrapping. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         return 'caller' if self._portal else 'callee' | ||||||
| 
 | 
 | ||||||
|     # init and streaming state |     # init and streaming state | ||||||
|     _started_called: bool = False |     _started_called: bool = False | ||||||
|     _started_received: bool = False |  | ||||||
|     _stream_opened: bool = False |     _stream_opened: bool = False | ||||||
| 
 | 
 | ||||||
|     # overrun handling machinery |     # overrun handling machinery | ||||||
|  | @ -196,7 +246,7 @@ class Context: | ||||||
|     async def send_stop(self) -> None: |     async def send_stop(self) -> None: | ||||||
|         await self.chan.send({'stop': True, 'cid': self.cid}) |         await self.chan.send({'stop': True, 'cid': self.cid}) | ||||||
| 
 | 
 | ||||||
|     async def _maybe_cancel_and_set_remote_error( |     def _maybe_cancel_and_set_remote_error( | ||||||
|         self, |         self, | ||||||
|         error: BaseException, |         error: BaseException, | ||||||
| 
 | 
 | ||||||
|  | @ -269,16 +319,19 @@ class Context: | ||||||
|         # that error as the reason. |         # that error as the reason. | ||||||
|         self._remote_error: BaseException = error |         self._remote_error: BaseException = error | ||||||
| 
 | 
 | ||||||
|         # always record the remote actor's uid since its cancellation |  | ||||||
|         # state is directly linked to ours (the local one). |  | ||||||
|         self._cancelled_remote = self.chan.uid |  | ||||||
| 
 |  | ||||||
|         if ( |         if ( | ||||||
|             isinstance(error, ContextCancelled) |             isinstance(error, ContextCancelled) | ||||||
|         ): |         ): | ||||||
|  |             # always record the cancelling actor's uid since its cancellation | ||||||
|  |             # state is linked and we want to know which process was | ||||||
|  |             # the cause / requester of the cancellation. | ||||||
|  |             self._canceller = error.canceller | ||||||
|  | 
 | ||||||
|             log.cancel( |             log.cancel( | ||||||
|                 'Remote task-context sucessfully cancelled for ' |                 'Remote task-context was cancelled for ' | ||||||
|                 f'{self.chan.uid}:{self.cid}' |                 f'actor: {self.chan.uid}\n' | ||||||
|  |                 f'task: {self.cid}\n' | ||||||
|  |                 f'canceller: {error.canceller}\n' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             if self._cancel_called: |             if self._cancel_called: | ||||||
|  | @ -289,22 +342,37 @@ class Context: | ||||||
|                 # and we **don't need to raise it** in local cancel |                 # and we **don't need to raise it** in local cancel | ||||||
|                 # scope since it will potentially override a real error. |                 # scope since it will potentially override a real error. | ||||||
|                 return |                 return | ||||||
|  | 
 | ||||||
|         else: |         else: | ||||||
|             log.error( |             log.error( | ||||||
|                 f'Remote context error for {self.chan.uid}:{self.cid}:\n' |                 f'Remote context error,\n' | ||||||
|  |                 f'remote actor: {self.chan.uid}\n' | ||||||
|  |                 f'task: {self.cid}\n' | ||||||
|                 f'{error}' |                 f'{error}' | ||||||
|             ) |             ) | ||||||
|  |             self._canceller = self.chan.uid | ||||||
|  | 
 | ||||||
|         # TODO: tempted to **not** do this by-reraising in a |         # TODO: tempted to **not** do this by-reraising in a | ||||||
|         # nursery and instead cancel a surrounding scope, detect |         # nursery and instead cancel a surrounding scope, detect | ||||||
|         # the cancellation, then lookup the error that was set? |         # the cancellation, then lookup the error that was set? | ||||||
|         # YES! this is way better and simpler! |         # YES! this is way better and simpler! | ||||||
|         if self._scope: |         cs: trio.CancelScope = self._scope | ||||||
|  |         if ( | ||||||
|  |             cs | ||||||
|  |             and not cs.cancel_called | ||||||
|  |             and not cs.cancelled_caught | ||||||
|  |         ): | ||||||
|  | 
 | ||||||
|  |             # TODO: we can for sure drop this right? | ||||||
|             # from trio.testing import wait_all_tasks_blocked |             # from trio.testing import wait_all_tasks_blocked | ||||||
|             # await wait_all_tasks_blocked() |             # await wait_all_tasks_blocked() | ||||||
|             # self._cancelled_remote = self.chan.uid | 
 | ||||||
|  |             # TODO: it'd sure be handy to inject our own | ||||||
|  |             # `trio.Cancelled` subtype here ;) | ||||||
|  |             # https://github.com/goodboy/tractor/issues/368 | ||||||
|             self._scope.cancel() |             self._scope.cancel() | ||||||
| 
 | 
 | ||||||
|             # this REPL usage actually works here BD |             # NOTE: this REPL usage actually works here dawg! Bo | ||||||
|             # from .devx._debug import pause |             # from .devx._debug import pause | ||||||
|             # await pause() |             # await pause() | ||||||
| 
 | 
 | ||||||
|  | @ -320,13 +388,19 @@ class Context: | ||||||
|         Timeout quickly in an attempt to sidestep 2-generals... |         Timeout quickly in an attempt to sidestep 2-generals... | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         side: str = 'caller' if self._portal else 'callee' |         side: str = self.side | ||||||
|         log.cancel( |         log.cancel( | ||||||
|             f'Cancelling {side} side of context to {self.chan.uid}' |             f'Cancelling {side} side of context to {self.chan.uid}' | ||||||
|         ) |         ) | ||||||
| 
 |  | ||||||
|         self._cancel_called: bool = True |         self._cancel_called: bool = True | ||||||
| 
 | 
 | ||||||
|  |         # caller side who entered `Portal.open_context()` | ||||||
|  |         # NOTE: on the call side we never manually call | ||||||
|  |         # `._scope.cancel()` since we expect the eventual | ||||||
|  |         # `ContextCancelled` from the other side to trigger this | ||||||
|  |         # when the runtime finally receives it during teardown | ||||||
|  |         # (normally in `.result()` called from | ||||||
|  |         # `Portal.open_context().__aexit__()`) | ||||||
|         if side == 'caller': |         if side == 'caller': | ||||||
|             if not self._portal: |             if not self._portal: | ||||||
|                 raise RuntimeError( |                 raise RuntimeError( | ||||||
|  | @ -349,7 +423,6 @@ class Context: | ||||||
|                     '_cancel_task', |                     '_cancel_task', | ||||||
|                     cid=cid, |                     cid=cid, | ||||||
|                 ) |                 ) | ||||||
|                 # print("EXITING CANCEL CALL") |  | ||||||
| 
 | 
 | ||||||
|             if cs.cancelled_caught: |             if cs.cancelled_caught: | ||||||
|                 # XXX: there's no way to know if the remote task was indeed |                 # XXX: there's no way to know if the remote task was indeed | ||||||
|  | @ -368,6 +441,9 @@ class Context: | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|         # callee side remote task |         # callee side remote task | ||||||
|  |         # NOTE: on this side we ALWAYS cancel the local scope since | ||||||
|  |         # the caller expects a `ContextCancelled` to be sent from | ||||||
|  |         # `._runtime._invoke()` back to the other side. | ||||||
|         else: |         else: | ||||||
|             # TODO: should we have an explicit cancel message |             # TODO: should we have an explicit cancel message | ||||||
|             # or is relaying the local `trio.Cancelled` as an |             # or is relaying the local `trio.Cancelled` as an | ||||||
|  | @ -403,7 +479,7 @@ class Context: | ||||||
|             ``trio``'s cancellation system. |             ``trio``'s cancellation system. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         actor = current_actor() |         actor: Actor = current_actor() | ||||||
| 
 | 
 | ||||||
|         # here we create a mem chan that corresponds to the |         # here we create a mem chan that corresponds to the | ||||||
|         # far end caller / callee. |         # far end caller / callee. | ||||||
|  | @ -413,12 +489,34 @@ class Context: | ||||||
|         # killed |         # killed | ||||||
| 
 | 
 | ||||||
|         if self._cancel_called: |         if self._cancel_called: | ||||||
|             task = trio.lowlevel.current_task().name | 
 | ||||||
|             raise ContextCancelled( |             # XXX NOTE: ALWAYS RAISE any remote error here even if | ||||||
|                 f'Context around {actor.uid[0]}:{task} was already cancelled!' |             # it's an expected `ContextCancelled` (after some local | ||||||
|  |             # task having called `.cancel()` ! | ||||||
|  |             # | ||||||
|  |             # WHY: we expect the error to always bubble up to the | ||||||
|  |             # surrounding `Portal.open_context()` call and be | ||||||
|  |             # absorbed there (silently) and we DO NOT want to | ||||||
|  |             # actually try to stream - a cancel msg was already | ||||||
|  |             # sent to the other side! | ||||||
|  |             if re := self._remote_error: | ||||||
|  |                 raise self._remote_error | ||||||
|  | 
 | ||||||
|  |             # XXX NOTE: if no `ContextCancelled` has been responded | ||||||
|  |             # back from the other side (yet), we raise a different | ||||||
|  |             # runtime error indicating that this task's usage of | ||||||
|  |             # `Context.cancel()` and then `.open_stream()` is WRONG! | ||||||
|  |             task: str = trio.lowlevel.current_task().name | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 'Stream opened after `Context.cancel()` called..?\n' | ||||||
|  |                 f'task: {actor.uid[0]}:{task}\n' | ||||||
|  |                 f'{self}' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         if not self._portal and not self._started_called: |         if ( | ||||||
|  |             not self._portal | ||||||
|  |             and not self._started_called | ||||||
|  |         ): | ||||||
|             raise RuntimeError( |             raise RuntimeError( | ||||||
|                 'Context.started()` must be called before opening a stream' |                 'Context.started()` must be called before opening a stream' | ||||||
|             ) |             ) | ||||||
|  | @ -434,7 +532,7 @@ class Context: | ||||||
|             msg_buffer_size=msg_buffer_size, |             msg_buffer_size=msg_buffer_size, | ||||||
|             allow_overruns=allow_overruns, |             allow_overruns=allow_overruns, | ||||||
|         ) |         ) | ||||||
|         ctx._allow_overruns = allow_overruns |         ctx._allow_overruns: bool = allow_overruns | ||||||
|         assert ctx is self |         assert ctx is self | ||||||
| 
 | 
 | ||||||
|         # XXX: If the underlying channel feeder receive mem chan has |         # XXX: If the underlying channel feeder receive mem chan has | ||||||
|  | @ -444,27 +542,32 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         if ctx._recv_chan._closed: |         if ctx._recv_chan._closed: | ||||||
|             raise trio.ClosedResourceError( |             raise trio.ClosedResourceError( | ||||||
|                 'The underlying channel for this stream was already closed!?') |                 'The underlying channel for this stream was already closed!?' | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|         async with MsgStream( |         async with MsgStream( | ||||||
|             ctx=self, |             ctx=self, | ||||||
|             rx_chan=ctx._recv_chan, |             rx_chan=ctx._recv_chan, | ||||||
|         ) as stream: |         ) as stream: | ||||||
| 
 | 
 | ||||||
|  |             # NOTE: we track all existing streams per portal for | ||||||
|  |             # the purposes of attempting graceful closes on runtime | ||||||
|  |             # cancel requests. | ||||||
|             if self._portal: |             if self._portal: | ||||||
|                 self._portal._streams.add(stream) |                 self._portal._streams.add(stream) | ||||||
| 
 | 
 | ||||||
|             try: |             try: | ||||||
|                 self._stream_opened = True |                 self._stream_opened: bool = True | ||||||
| 
 | 
 | ||||||
|                 # XXX: do we need this? |                 # XXX: do we need this? | ||||||
|                 # ensure we aren't cancelled before yielding the stream |                 # ensure we aren't cancelled before yielding the stream | ||||||
|                 # await trio.lowlevel.checkpoint() |                 # await trio.lowlevel.checkpoint() | ||||||
|                 yield stream |                 yield stream | ||||||
| 
 | 
 | ||||||
|                 # NOTE: Make the stream "one-shot use".  On exit, signal |                 # NOTE: Make the stream "one-shot use".  On exit, | ||||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the |                 # signal | ||||||
|                 # far end. |                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to | ||||||
|  |                 # the far end. | ||||||
|                 await stream.aclose() |                 await stream.aclose() | ||||||
| 
 | 
 | ||||||
|             finally: |             finally: | ||||||
|  | @ -495,14 +598,22 @@ class Context: | ||||||
|         # whenever  ``CancelScope.cancel()`` was called) and |         # whenever  ``CancelScope.cancel()`` was called) and | ||||||
|         # instead silently reap the expected cancellation |         # instead silently reap the expected cancellation | ||||||
|         # "error"-msg. |         # "error"-msg. | ||||||
|  |         our_uid: tuple[str, str] = current_actor().uid | ||||||
|         if ( |         if ( | ||||||
|             isinstance(err, ContextCancelled) |             isinstance(err, ContextCancelled) | ||||||
|             and ( |             and ( | ||||||
|                 self._cancel_called |                 self._cancel_called | ||||||
|                 or self.chan._cancel_called |                 or self.chan._cancel_called | ||||||
|                 or tuple(err.canceller) == current_actor().uid |                 or self.canceller == our_uid | ||||||
|  |                 or tuple(err.canceller) == our_uid | ||||||
|             ) |             ) | ||||||
|         ): |         ): | ||||||
|  |             # NOTE: we set the local scope error to any "self | ||||||
|  |             # cancellation" error-response thus "absorbing" | ||||||
|  |             # the error silently B) | ||||||
|  |             if self._local_error is None: | ||||||
|  |                 self._local_error = err | ||||||
|  | 
 | ||||||
|             return err |             return err | ||||||
| 
 | 
 | ||||||
|         # NOTE: currently we are masking underlying runtime errors |         # NOTE: currently we are masking underlying runtime errors | ||||||
|  | @ -515,7 +626,7 @@ class Context: | ||||||
|         #       runtime frames from the tb explicitly? |         #       runtime frames from the tb explicitly? | ||||||
|         # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement |         # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement | ||||||
|         # https://stackoverflow.com/a/24752607 |         # https://stackoverflow.com/a/24752607 | ||||||
|         __tracebackhide__: bool = True |         # __tracebackhide__: bool = True | ||||||
|         raise err from None |         raise err from None | ||||||
| 
 | 
 | ||||||
|     async def result(self) -> Any | Exception: |     async def result(self) -> Any | Exception: | ||||||
|  | @ -544,7 +655,6 @@ class Context: | ||||||
|         of the remote cancellation. |         of the remote cancellation. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = True |  | ||||||
|         assert self._portal, "Context.result() can not be called from callee!" |         assert self._portal, "Context.result() can not be called from callee!" | ||||||
|         assert self._recv_chan |         assert self._recv_chan | ||||||
| 
 | 
 | ||||||
|  | @ -607,13 +717,15 @@ class Context: | ||||||
|                         "Received internal error at portal?" |                         "Received internal error at portal?" | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|                     err = unpack_error( |                     if err:= unpack_error( | ||||||
|                         msg, |                         msg, | ||||||
|                         self._portal.channel |                         self._portal.channel | ||||||
|                     )  # from msgerr |                     ):  # from msgerr | ||||||
|  |                         self._maybe_cancel_and_set_remote_error(err) | ||||||
|  |                         self._maybe_raise_remote_err(err) | ||||||
| 
 | 
 | ||||||
|                     err = self._maybe_raise_remote_err(err) |                     else: | ||||||
|                     self._remote_error = err |                         raise | ||||||
| 
 | 
 | ||||||
|         if re := self._remote_error: |         if re := self._remote_error: | ||||||
|             return self._maybe_raise_remote_err(re) |             return self._maybe_raise_remote_err(re) | ||||||
|  | @ -724,13 +836,17 @@ class Context: | ||||||
|             f"Delivering {msg} from {uid} to caller {cid}" |             f"Delivering {msg} from {uid} to caller {cid}" | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         error = msg.get('error') |         if ( | ||||||
|         if error := unpack_error( |             msg.get('error')  # check for field | ||||||
|                 msg, |             and ( | ||||||
|                 self.chan, |                 error := unpack_error( | ||||||
|  |                     msg, | ||||||
|  |                     self.chan, | ||||||
|  |                 ) | ||||||
|  |             ) | ||||||
|         ): |         ): | ||||||
|             self._cancel_msg = msg |             self._cancel_msg = msg | ||||||
|             await self._maybe_cancel_and_set_remote_error(error) |             self._maybe_cancel_and_set_remote_error(error) | ||||||
| 
 | 
 | ||||||
|         if ( |         if ( | ||||||
|             self._in_overrun |             self._in_overrun | ||||||
|  | @ -765,7 +881,7 @@ class Context: | ||||||
| 
 | 
 | ||||||
|             # XXX: always push an error even if the local |             # XXX: always push an error even if the local | ||||||
|             # receiver is in overrun state. |             # receiver is in overrun state. | ||||||
|             # await self._maybe_cancel_and_set_remote_error(msg) |             # self._maybe_cancel_and_set_remote_error(msg) | ||||||
| 
 | 
 | ||||||
|             local_uid = current_actor().uid |             local_uid = current_actor().uid | ||||||
|             lines = [ |             lines = [ | ||||||
|  |  | ||||||
|  | @ -86,12 +86,14 @@ async def _invoke( | ||||||
|     ] = trio.TASK_STATUS_IGNORED, |     ] = trio.TASK_STATUS_IGNORED, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     Invoke local func and deliver result(s) over provided channel. |     Schedule a `trio` task-as-func and deliver result(s) over | ||||||
|  |     connected IPC channel. | ||||||
| 
 | 
 | ||||||
|     This is the core "RPC task" starting machinery. |     This is the core "RPC" `trio.Task` scheduling machinery used to start every | ||||||
|  |     remotely invoked function, normally in `Actor._service_n: trio.Nursery`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     __tracebackhide__ = True |     __tracebackhide__: bool = True | ||||||
|     treat_as_gen: bool = False |     treat_as_gen: bool = False | ||||||
|     failed_resp: bool = False |     failed_resp: bool = False | ||||||
| 
 | 
 | ||||||
|  | @ -199,6 +201,8 @@ async def _invoke( | ||||||
|                 # far end async gen to tear down |                 # far end async gen to tear down | ||||||
|                 await chan.send({'stop': True, 'cid': cid}) |                 await chan.send({'stop': True, 'cid': cid}) | ||||||
| 
 | 
 | ||||||
|  |         # TODO: every other "func type" should be implemented from | ||||||
|  |         # a special case of a context eventually! | ||||||
|         elif context: |         elif context: | ||||||
|             # context func with support for bi-dir streaming |             # context func with support for bi-dir streaming | ||||||
|             await chan.send({'functype': 'context', 'cid': cid}) |             await chan.send({'functype': 'context', 'cid': cid}) | ||||||
|  | @ -209,21 +213,30 @@ async def _invoke( | ||||||
|                     ctx._scope = nurse.cancel_scope |                     ctx._scope = nurse.cancel_scope | ||||||
|                     task_status.started(ctx) |                     task_status.started(ctx) | ||||||
|                     res = await coro |                     res = await coro | ||||||
|                     await chan.send({'return': res, 'cid': cid}) |                     await chan.send({ | ||||||
|  |                         'return': res, | ||||||
|  |                         'cid': cid | ||||||
|  |                     }) | ||||||
| 
 | 
 | ||||||
|             # XXX: do we ever trigger this block any more? |             # XXX: do we ever trigger this block any more? | ||||||
|             except ( |             except ( | ||||||
|                 BaseExceptionGroup, |                 BaseExceptionGroup, | ||||||
|                 trio.Cancelled, |                 trio.Cancelled, | ||||||
|             ): |             ) as scope_error: | ||||||
|                 # if a context error was set then likely |  | ||||||
|                 # thei multierror was raised due to that |  | ||||||
|                 if ctx._remote_error is not None: |  | ||||||
|                     raise ctx._remote_error |  | ||||||
| 
 | 
 | ||||||
|                 # maybe TODO: pack in ``trio.Cancelled.__traceback__`` here |                 # always set this (callee) side's exception as the | ||||||
|                 # so they can be unwrapped and displayed on the caller |                 # local error on the context | ||||||
|                 # side? |                 ctx._local_error: BaseException = scope_error | ||||||
|  | 
 | ||||||
|  |                 # if a remote error was set then likely the | ||||||
|  |                 # exception group was raised due to that, so | ||||||
|  |                 # and we instead raise that error immediately! | ||||||
|  |                 if re := ctx._remote_error: | ||||||
|  |                     ctx._maybe_raise_remote_err(re) | ||||||
|  | 
 | ||||||
|  |                 # maybe TODO: pack in | ||||||
|  |                 # ``trio.Cancelled.__traceback__`` here so they can | ||||||
|  |                 # be unwrapped and displayed on the caller side? | ||||||
|                 raise |                 raise | ||||||
| 
 | 
 | ||||||
|             finally: |             finally: | ||||||
|  | @ -234,11 +247,11 @@ async def _invoke( | ||||||
|                 # don't pop the local context until we know the |                 # don't pop the local context until we know the | ||||||
|                 # associated child isn't in debug any more |                 # associated child isn't in debug any more | ||||||
|                 await _debug.maybe_wait_for_debugger() |                 await _debug.maybe_wait_for_debugger() | ||||||
|                 ctx = actor._contexts.pop((chan.uid, cid)) |                 ctx: Context = actor._contexts.pop((chan.uid, cid)) | ||||||
|                 if ctx: |                 log.runtime( | ||||||
|                     log.runtime( |                     f'Context entrypoint {func} was terminated:\n' | ||||||
|                         f'Context entrypoint {func} was terminated:\n{ctx}' |                     f'{ctx}' | ||||||
|                     ) |                 ) | ||||||
| 
 | 
 | ||||||
|             if ctx.cancelled_caught: |             if ctx.cancelled_caught: | ||||||
| 
 | 
 | ||||||
|  | @ -246,44 +259,43 @@ async def _invoke( | ||||||
|                 # before raising any context cancelled case |                 # before raising any context cancelled case | ||||||
|                 # so that real remote errors don't get masked as |                 # so that real remote errors don't get masked as | ||||||
|                 # ``ContextCancelled``s. |                 # ``ContextCancelled``s. | ||||||
|                 re = ctx._remote_error |                 if re := ctx._remote_error: | ||||||
|                 if re: |  | ||||||
|                     ctx._maybe_raise_remote_err(re) |                     ctx._maybe_raise_remote_err(re) | ||||||
| 
 | 
 | ||||||
|                 fname = func.__name__ |                 fname: str = func.__name__ | ||||||
|                 cs: trio.CancelScope = ctx._scope |                 cs: trio.CancelScope = ctx._scope | ||||||
|                 if cs.cancel_called: |                 if cs.cancel_called: | ||||||
|                     canceller = ctx._cancelled_remote |                     canceller: tuple = ctx.canceller | ||||||
|                     # await _debug.breakpoint() |                     msg: str = ( | ||||||
|  |                         f'`{fname}()`@{actor.uid} cancelled by ' | ||||||
|  |                     ) | ||||||
| 
 | 
 | ||||||
|                     # NOTE / TODO: if we end up having |                     # NOTE / TODO: if we end up having | ||||||
|                     # ``Actor._cancel_task()`` call |                     # ``Actor._cancel_task()`` call | ||||||
|                     # ``Context.cancel()`` directly, we're going to |                     # ``Context.cancel()`` directly, we're going to | ||||||
|                     # need to change this logic branch since it will |                     # need to change this logic branch since it | ||||||
|                     # always enter.. |                     # will always enter.. | ||||||
|                     if ctx._cancel_called: |                     if ctx._cancel_called: | ||||||
|                         msg = f'`{fname}()`@{actor.uid} cancelled itself' |                         msg += 'itself ' | ||||||
| 
 |  | ||||||
|                     else: |  | ||||||
|                         msg = ( |  | ||||||
|                             f'`{fname}()`@{actor.uid} ' |  | ||||||
|                             'was remotely cancelled by ' |  | ||||||
|                         ) |  | ||||||
| 
 | 
 | ||||||
|                     # if the channel which spawned the ctx is the |                     # if the channel which spawned the ctx is the | ||||||
|                     # one that cancelled it then we report that, vs. |                     # one that cancelled it then we report that, vs. | ||||||
|                     # it being some other random actor that for ex. |                     # it being some other random actor that for ex. | ||||||
|                     # some actor who calls `Portal.cancel_actor()` |                     # some actor who calls `Portal.cancel_actor()` | ||||||
|                     # and by side-effect cancels this ctx. |                     # and by side-effect cancels this ctx. | ||||||
|                     if canceller == ctx.chan.uid: |                     elif canceller == ctx.chan.uid: | ||||||
|                         msg += f'its caller {canceller}' |                         msg += f'its caller {canceller} ' | ||||||
|  | 
 | ||||||
|                     else: |                     else: | ||||||
|                         msg += f'remote actor {canceller}' |                         msg += f'remote actor {canceller}' | ||||||
| 
 | 
 | ||||||
|                     # TODO: does this ever get set any more or can |                     # TODO: does this ever get set any more or can | ||||||
|                     # we remove it? |                     # we remove it? | ||||||
|                     if ctx._cancel_msg: |                     if ctx._cancel_msg: | ||||||
|                         msg += f' with msg:\n{ctx._cancel_msg}' |                         msg += ( | ||||||
|  |                             ' with msg:\n' | ||||||
|  |                             f'{ctx._cancel_msg}' | ||||||
|  |                         ) | ||||||
| 
 | 
 | ||||||
|                     # task-contex was either cancelled by request using |                     # task-contex was either cancelled by request using | ||||||
|                     # ``Portal.cancel_actor()`` or ``Context.cancel()`` |                     # ``Portal.cancel_actor()`` or ``Context.cancel()`` | ||||||
|  | @ -296,10 +308,13 @@ async def _invoke( | ||||||
|                         canceller=canceller, |                         canceller=canceller, | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|  |         # regular async function | ||||||
|         else: |         else: | ||||||
|             # regular async function |  | ||||||
|             try: |             try: | ||||||
|                 await chan.send({'functype': 'asyncfunc', 'cid': cid}) |                 await chan.send({ | ||||||
|  |                     'functype': 'asyncfunc', | ||||||
|  |                     'cid': cid | ||||||
|  |                 }) | ||||||
|             except trio.BrokenResourceError: |             except trio.BrokenResourceError: | ||||||
|                 failed_resp = True |                 failed_resp = True | ||||||
|                 if is_rpc: |                 if is_rpc: | ||||||
|  | @ -313,7 +328,7 @@ async def _invoke( | ||||||
|                 ctx._scope = cs |                 ctx._scope = cs | ||||||
|                 task_status.started(ctx) |                 task_status.started(ctx) | ||||||
|                 result = await coro |                 result = await coro | ||||||
|                 fname = func.__name__ |                 fname: str = func.__name__ | ||||||
|                 log.runtime(f'{fname}() result: {result}') |                 log.runtime(f'{fname}() result: {result}') | ||||||
|                 if not failed_resp: |                 if not failed_resp: | ||||||
|                     # only send result if we know IPC isn't down |                     # only send result if we know IPC isn't down | ||||||
|  | @ -1073,7 +1088,12 @@ class Actor: | ||||||
|             - return control the parent channel message loop |             - return control the parent channel message loop | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         log.cancel(f"{self.uid} is trying to cancel") |         log.cancel( | ||||||
|  |             f'{self.uid} requested to cancel by:\n' | ||||||
|  |             f'{requesting_uid}' | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # TODO: what happens here when we self-cancel tho? | ||||||
|         self._cancel_called_by_remote: tuple = requesting_uid |         self._cancel_called_by_remote: tuple = requesting_uid | ||||||
|         self._cancel_called = True |         self._cancel_called = True | ||||||
| 
 | 
 | ||||||
|  | @ -1088,7 +1108,9 @@ class Actor: | ||||||
|                 dbcs.cancel() |                 dbcs.cancel() | ||||||
| 
 | 
 | ||||||
|             # kill all ongoing tasks |             # kill all ongoing tasks | ||||||
|             await self.cancel_rpc_tasks(requesting_uid=requesting_uid) |             await self.cancel_rpc_tasks( | ||||||
|  |                 requesting_uid=requesting_uid, | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|             # stop channel server |             # stop channel server | ||||||
|             self.cancel_server() |             self.cancel_server() | ||||||
|  | @ -1118,8 +1140,8 @@ class Actor: | ||||||
|         self, |         self, | ||||||
|         cid: str, |         cid: str, | ||||||
|         chan: Channel, |         chan: Channel, | ||||||
| 
 |  | ||||||
|         requesting_uid: tuple[str, str] | None = None, |         requesting_uid: tuple[str, str] | None = None, | ||||||
|  | 
 | ||||||
|     ) -> bool: |     ) -> bool: | ||||||
|         ''' |         ''' | ||||||
|         Cancel a local task by call-id / channel. |         Cancel a local task by call-id / channel. | ||||||
|  | @ -1136,7 +1158,7 @@ class Actor: | ||||||
|             # this ctx based lookup ensures the requested task to |             # this ctx based lookup ensures the requested task to | ||||||
|             # be cancelled was indeed spawned by a request from this channel |             # be cancelled was indeed spawned by a request from this channel | ||||||
|             ctx, func, is_complete = self._rpc_tasks[(chan, cid)] |             ctx, func, is_complete = self._rpc_tasks[(chan, cid)] | ||||||
|             scope = ctx._scope |             scope: trio.CancelScope = ctx._scope | ||||||
|         except KeyError: |         except KeyError: | ||||||
|             log.cancel(f"{cid} has already completed/terminated?") |             log.cancel(f"{cid} has already completed/terminated?") | ||||||
|             return True |             return True | ||||||
|  | @ -1146,10 +1168,10 @@ class Actor: | ||||||
|             f"peer: {chan.uid}\n") |             f"peer: {chan.uid}\n") | ||||||
| 
 | 
 | ||||||
|         if ( |         if ( | ||||||
|             ctx._cancelled_remote is None |             ctx._canceller is None | ||||||
|             and requesting_uid |             and requesting_uid | ||||||
|         ): |         ): | ||||||
|             ctx._cancelled_remote: tuple = requesting_uid |             ctx._canceller: tuple = requesting_uid | ||||||
| 
 | 
 | ||||||
|         # don't allow cancelling this function mid-execution |         # don't allow cancelling this function mid-execution | ||||||
|         # (is this necessary?) |         # (is this necessary?) | ||||||
|  | @ -1159,6 +1181,7 @@ class Actor: | ||||||
|         # TODO: shouldn't we eventually be calling ``Context.cancel()`` |         # TODO: shouldn't we eventually be calling ``Context.cancel()`` | ||||||
|         # directly here instead (since that method can handle both |         # directly here instead (since that method can handle both | ||||||
|         # side's calls into it? |         # side's calls into it? | ||||||
|  |         # await ctx.cancel() | ||||||
|         scope.cancel() |         scope.cancel() | ||||||
| 
 | 
 | ||||||
|         # wait for _invoke to mark the task complete |         # wait for _invoke to mark the task complete | ||||||
|  | @ -1186,9 +1209,12 @@ class Actor: | ||||||
|         registered for each. |         registered for each. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         tasks = self._rpc_tasks |         tasks: dict = self._rpc_tasks | ||||||
|         if tasks: |         if tasks: | ||||||
|             log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") |             log.cancel( | ||||||
|  |                 f'Cancelling all {len(tasks)} rpc tasks:\n' | ||||||
|  |                 f'{tasks}' | ||||||
|  |             ) | ||||||
|             for ( |             for ( | ||||||
|                 (chan, cid), |                 (chan, cid), | ||||||
|                 (ctx, func, is_complete), |                 (ctx, func, is_complete), | ||||||
|  | @ -1206,7 +1232,9 @@ class Actor: | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|             log.cancel( |             log.cancel( | ||||||
|                 f"Waiting for remaining rpc tasks to complete {tasks}") |                 'Waiting for remaining rpc tasks to complete:\n' | ||||||
|  |                 f'{tasks}' | ||||||
|  |             ) | ||||||
|             await self._ongoing_rpc_tasks.wait() |             await self._ongoing_rpc_tasks.wait() | ||||||
| 
 | 
 | ||||||
|     def cancel_server(self) -> None: |     def cancel_server(self) -> None: | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue