Solve another OoB cancellation case, the bg task one
Such that we are able to (finally) detect when we should
`Context._scope.cancel()` specifically when the `.parent_task` is
**not** blocking on receiving from the underlying `._rx_chan`, since if
the task is blocking on `.receive()` it will call `.cancel()`
implicitly.
This is a lot to explain with very little code actually needed for the
implementation (are we like `trio` yet anyone?? XD) but the main jist is
that `Context._maybe_cancel_and_set_remote_error()` needed the
additional case of calling `._scope.cancel()` whenever we know that
a remote-error/ctxc won't be immediately handled, bc user code is doing
non `Context`-API things, and result in a similar outcome as if that
task was waiting on `Context.wait_for_result()` or `.__aexite__()`.
Impl details,
- add a new `._is_blocked_on_rx_chan()` method which predicates whether
  the (new) `.parent_task` is blocking on `._rx_chan.receive()`.
  * see various stipulations about the current impl and how we might
    need to adjust for the future given `trio`'s commitment to the
    `Task.custom_sleep_data` attr..
- add `.parent_task`, a pub wrapper for `._task`.
- check for `not ._is_blocked_on_rx_chan()` before manually cancelling
  the local `.parent_task`
- minimize the surrounding branch case expressions.
Other,
- tweak a couple logs.
- add a new `.cancel()` pre-started msg.
- mask the `.cancel_called` setter, it's only (been) used for tracing.
- todos around maybe moving the `._nursery` allocation "around" the
  `.start_remote_task()` call and various subsequent tweaks therein.
			
			
				oob_cancel_testing
			
			
		
							parent
							
								
									36c54d1289
								
							
						
					
					
						commit
						dcd3c77461
					
				|  | @ -442,25 +442,25 @@ class Context: | ||||||
|         ''' |         ''' | ||||||
|         Records whether cancellation has been requested for this context |         Records whether cancellation has been requested for this context | ||||||
|         by a call to  `.cancel()` either due to, |         by a call to  `.cancel()` either due to, | ||||||
|         - either an explicit call by some local task, |         - an explicit call by some local task, | ||||||
|         - or an implicit call due to an error caught inside |         - or an implicit call due to an error caught inside | ||||||
|           the ``Portal.open_context()`` block. |           the `Portal.open_context()` block. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._cancel_called |         return self._cancel_called | ||||||
| 
 | 
 | ||||||
|     @cancel_called.setter |     # XXX, to debug who frickin sets it.. | ||||||
|     def cancel_called(self, val: bool) -> None: |     # @cancel_called.setter | ||||||
|         ''' |     # def cancel_called(self, val: bool) -> None: | ||||||
|         Set the self-cancelled request `bool` value. |     #     ''' | ||||||
|  |     #     Set the self-cancelled request `bool` value. | ||||||
| 
 | 
 | ||||||
|         ''' |     #     ''' | ||||||
|         # to debug who frickin sets it.. |  | ||||||
|     #     if val: |     #     if val: | ||||||
|     #         from .devx import pause_from_sync |     #         from .devx import pause_from_sync | ||||||
|     #         pause_from_sync() |     #         pause_from_sync() | ||||||
| 
 | 
 | ||||||
|         self._cancel_called = val |     #     self._cancel_called = val | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def canceller(self) -> tuple[str, str]|None: |     def canceller(self) -> tuple[str, str]|None: | ||||||
|  | @ -635,6 +635,71 @@ class Context: | ||||||
|         ''' |         ''' | ||||||
|         await self.chan.send(Stop(cid=self.cid)) |         await self.chan.send(Stop(cid=self.cid)) | ||||||
| 
 | 
 | ||||||
|  |     @property | ||||||
|  |     def parent_task(self) -> trio.Task: | ||||||
|  |         ''' | ||||||
|  |         This IPC context's "owning task" which is a `trio.Task` | ||||||
|  |         on one of the "sides" of the IPC. | ||||||
|  | 
 | ||||||
|  |         Note that the "parent_" prefix here refers to the local | ||||||
|  |         `trio` task tree using the same interface as | ||||||
|  |         `trio.Nursery.parent_task` whereas for IPC contexts, | ||||||
|  |         a different cross-actor task hierarchy exists: | ||||||
|  | 
 | ||||||
|  |         - a "parent"-side which originally entered | ||||||
|  |           `Portal.open_context()`, | ||||||
|  | 
 | ||||||
|  |         - the "child"-side which was spawned and scheduled to invoke | ||||||
|  |           a function decorated with `@tractor.context`. | ||||||
|  | 
 | ||||||
|  |         This task is thus a handle to mem-domain-distinct/per-process | ||||||
|  |         `Nursery.parent_task` depending on in which of the above | ||||||
|  |         "sides" this context exists. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         return self._task | ||||||
|  | 
 | ||||||
|  |     def _is_blocked_on_rx_chan(self) -> bool: | ||||||
|  |         ''' | ||||||
|  |         Predicate to indcate whether the owner `._task: trio.Task` is | ||||||
|  |         currently blocked (by `.receive()`-ing) on its underlying RPC | ||||||
|  |         feeder `._rx_chan`. | ||||||
|  | 
 | ||||||
|  |         This knowledge is highly useful when handling so called | ||||||
|  |         "out-of-band" (OoB) cancellation conditions where a peer | ||||||
|  |         actor's task transmitted some remote error/cancel-msg and we | ||||||
|  |         must know whether to signal-via-cancel currently executing | ||||||
|  |         "user-code" (user defined code embedded in `ctx._scope`) or | ||||||
|  |         simply to forward the IPC-msg-as-error **without calling** | ||||||
|  |         `._scope.cancel()`. | ||||||
|  | 
 | ||||||
|  |         In the latter case it is presumed that if the owner task is | ||||||
|  |         blocking for the next IPC msg, it will eventually receive, | ||||||
|  |         process and raise the equivalent local error **without** | ||||||
|  |         requiring `._scope.cancel()` to be explicitly called by the | ||||||
|  |         *delivering OoB RPC-task* (via `_deliver_msg()`). | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         # NOTE, see the mem-chan meth-impls for *why* this | ||||||
|  |         # logic works, | ||||||
|  |         # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` | ||||||
|  |         # | ||||||
|  |         # XXX realize that this is NOT an | ||||||
|  |         # official/will-be-loudly-deprecated API: | ||||||
|  |         # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data | ||||||
|  |         #  |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled | ||||||
|  |         # | ||||||
|  |         # orig repo intro in the mem-chan change over patch: | ||||||
|  |         # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 | ||||||
|  |         #  |_https://github.com/python-trio/trio/pull/616 | ||||||
|  |         #  |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 | ||||||
|  |         # | ||||||
|  |         return ( | ||||||
|  |             self._task.custom_sleep_data | ||||||
|  |             is | ||||||
|  |             self._rx_chan | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|     def _maybe_cancel_and_set_remote_error( |     def _maybe_cancel_and_set_remote_error( | ||||||
|         self, |         self, | ||||||
|         error: BaseException, |         error: BaseException, | ||||||
|  | @ -787,13 +852,27 @@ class Context: | ||||||
|         if self._canceller is None: |         if self._canceller is None: | ||||||
|             log.error('Ctx has no canceller set!?') |             log.error('Ctx has no canceller set!?') | ||||||
| 
 | 
 | ||||||
|  |         cs: trio.CancelScope = self._scope | ||||||
|  | 
 | ||||||
|  |         # ?TODO? see comment @ .start_remote_task()` | ||||||
|  |         # | ||||||
|  |         # if not cs: | ||||||
|  |         #     from .devx import mk_pdb | ||||||
|  |         #     mk_pdb().set_trace() | ||||||
|  |         #     raise RuntimeError( | ||||||
|  |         #         f'IPC ctx was not be opened prior to remote error delivery !?\n' | ||||||
|  |         #         f'{self}\n' | ||||||
|  |         #         f'\n' | ||||||
|  |         #         f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' | ||||||
|  |         #     ) | ||||||
|  | 
 | ||||||
|         # Cancel the local `._scope`, catch that |         # Cancel the local `._scope`, catch that | ||||||
|         # `._scope.cancelled_caught` and re-raise any remote error |         # `._scope.cancelled_caught` and re-raise any remote error | ||||||
|         # once exiting (or manually calling `.wait_for_result()`) the |         # once exiting (or manually calling `.wait_for_result()`) the | ||||||
|         # `.open_context()`  block. |         # `.open_context()`  block. | ||||||
|         cs: trio.CancelScope = self._scope |  | ||||||
|         if ( |         if ( | ||||||
|             cs |             cs | ||||||
|  |             and not cs.cancel_called | ||||||
| 
 | 
 | ||||||
|             # XXX this is an expected cancel request response |             # XXX this is an expected cancel request response | ||||||
|             # message and we **don't need to raise it** in the |             # message and we **don't need to raise it** in the | ||||||
|  | @ -802,8 +881,7 @@ class Context: | ||||||
|             # if `._cancel_called` then `.cancel_acked and .cancel_called` |             # if `._cancel_called` then `.cancel_acked and .cancel_called` | ||||||
|             # always should be set. |             # always should be set. | ||||||
|             and not self._is_self_cancelled() |             and not self._is_self_cancelled() | ||||||
|             and not cs.cancel_called |             # and not cs.cancelled_caught | ||||||
|             and not cs.cancelled_caught |  | ||||||
|         ): |         ): | ||||||
|             if ( |             if ( | ||||||
|                 msgerr |                 msgerr | ||||||
|  | @ -814,7 +892,7 @@ class Context: | ||||||
|                 not self._cancel_on_msgerr |                 not self._cancel_on_msgerr | ||||||
|             ): |             ): | ||||||
|                 message: str = ( |                 message: str = ( | ||||||
|                     'NOT Cancelling `Context._scope` since,\n' |                     f'NOT Cancelling `Context._scope` since,\n' | ||||||
|                     f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' |                     f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' | ||||||
|                     f'AND we got a msg-type-error!\n' |                     f'AND we got a msg-type-error!\n' | ||||||
|                     f'{error}\n' |                     f'{error}\n' | ||||||
|  | @ -824,13 +902,43 @@ class Context: | ||||||
|                 # `trio.Cancelled` subtype here ;) |                 # `trio.Cancelled` subtype here ;) | ||||||
|                 # https://github.com/goodboy/tractor/issues/368 |                 # https://github.com/goodboy/tractor/issues/368 | ||||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' |                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||||
|                 # from .devx import pause_from_sync |                 cs.cancel() | ||||||
|                 # pause_from_sync() | 
 | ||||||
|                 self._scope.cancel() |         # TODO, explicit condition for OoB (self-)cancellation? | ||||||
|         else: |         # - we called `Portal.cancel_actor()` from this actor | ||||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' |         #   and the peer ctx task delivered ctxc due to it. | ||||||
|  |         # - currently `self._is_self_cancelled()` will be true | ||||||
|  |         #   since the ctxc.canceller check will match us even though it | ||||||
|  |         #   wasn't from this ctx specifically! | ||||||
|  |         elif ( | ||||||
|  |             cs | ||||||
|  |             and self._is_self_cancelled() | ||||||
|  |             and not cs.cancel_called | ||||||
|  |         ): | ||||||
|  |             message: str = ( | ||||||
|  |                 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' | ||||||
|  |                 '\n' | ||||||
|  |             ) | ||||||
|             # from .devx import mk_pdb |             # from .devx import mk_pdb | ||||||
|             # mk_pdb().set_trace() |             # mk_pdb().set_trace() | ||||||
|  |             # TODO XXX, required to fix timeout failure in | ||||||
|  |             # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` | ||||||
|  |             # | ||||||
|  | 
 | ||||||
|  |             # XXX NOTE XXX, this is SUPER SUBTLE! | ||||||
|  |             # we only want to cancel our embedded `._scope` | ||||||
|  |             # if the ctx's current/using task is NOT blocked | ||||||
|  |             # on `._rx_chan.receive()` and on some other | ||||||
|  |             # `trio`-checkpoint since in the former case | ||||||
|  |             # any `._remote_error` will be relayed through | ||||||
|  |             # the rx-chan and appropriately raised by the owning | ||||||
|  |             # `._task` directly. IF the owner task is however | ||||||
|  |             # blocking elsewhere we need to interrupt it **now**. | ||||||
|  |             if not self._is_blocked_on_rx_chan(): | ||||||
|  |                 cs.cancel() | ||||||
|  |         else: | ||||||
|  |             # rx_stats = self._rx_chan.statistics() | ||||||
|  |             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||||
| 
 | 
 | ||||||
|         fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' |         fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' | ||||||
|         if ( |         if ( | ||||||
|  | @ -854,6 +962,7 @@ class Context: | ||||||
|                 + |                 + | ||||||
|                 cs_fmt |                 cs_fmt | ||||||
|             ) |             ) | ||||||
|  | 
 | ||||||
|         log.cancel( |         log.cancel( | ||||||
|             message |             message | ||||||
|             + |             + | ||||||
|  | @ -946,8 +1055,9 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         side: str = self.side |         side: str = self.side | ||||||
|         # XXX for debug via the `@.setter` |         self._cancel_called = True | ||||||
|         self.cancel_called = True |         # ^ XXX for debug via the `@.setter` | ||||||
|  |         # self.cancel_called = True | ||||||
| 
 | 
 | ||||||
|         header: str = ( |         header: str = ( | ||||||
|             f'Cancelling ctx from {side!r}-side\n' |             f'Cancelling ctx from {side!r}-side\n' | ||||||
|  | @ -2011,6 +2121,9 @@ async def open_context_from_portal( | ||||||
|             f'|_{portal.actor}\n' |             f'|_{portal.actor}\n' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |     # ?TODO? could we move this to inside the `tn` block? | ||||||
|  |     # -> would allow doing `ctx.parent_task = tn.parent_task` ? | ||||||
|  |     # -> would allow a `if not ._scope: => raise RTE` ? | ||||||
|     ctx: Context = await portal.actor.start_remote_task( |     ctx: Context = await portal.actor.start_remote_task( | ||||||
|         portal.channel, |         portal.channel, | ||||||
|         nsf=nsf, |         nsf=nsf, | ||||||
|  | @ -2037,6 +2150,7 @@ async def open_context_from_portal( | ||||||
|     scope_err: BaseException|None = None |     scope_err: BaseException|None = None | ||||||
|     ctxc_from_child: ContextCancelled|None = None |     ctxc_from_child: ContextCancelled|None = None | ||||||
|     try: |     try: | ||||||
|  |         # from .devx import pause | ||||||
|         async with ( |         async with ( | ||||||
|             collapse_eg(), |             collapse_eg(), | ||||||
|             trio.open_nursery() as tn, |             trio.open_nursery() as tn, | ||||||
|  | @ -2059,6 +2173,10 @@ async def open_context_from_portal( | ||||||
|             # the dialog, the `Error` msg should be raised from the `msg` |             # the dialog, the `Error` msg should be raised from the `msg` | ||||||
|             # handling block below. |             # handling block below. | ||||||
|             try: |             try: | ||||||
|  |                 log.runtime( | ||||||
|  |                     f'IPC ctx parent waiting on Started msg..\n' | ||||||
|  |                     f'ctx.cid: {ctx.cid!r}\n' | ||||||
|  |                 ) | ||||||
|                 started_msg, first = await ctx._pld_rx.recv_msg( |                 started_msg, first = await ctx._pld_rx.recv_msg( | ||||||
|                     ipc=ctx, |                     ipc=ctx, | ||||||
|                     expect_msg=Started, |                     expect_msg=Started, | ||||||
|  | @ -2067,16 +2185,16 @@ async def open_context_from_portal( | ||||||
|                 ) |                 ) | ||||||
|             except trio.Cancelled as taskc: |             except trio.Cancelled as taskc: | ||||||
|                 ctx_cs: trio.CancelScope = ctx._scope |                 ctx_cs: trio.CancelScope = ctx._scope | ||||||
|  |                 log.cancel( | ||||||
|  |                     f'IPC ctx was cancelled during "child" task sync due to\n\n' | ||||||
|  |                     f'.cid: {ctx.cid!r}\n' | ||||||
|  |                     f'.maybe_error: {ctx.maybe_error!r}\n' | ||||||
|  |                 ) | ||||||
|  |                 # await pause(shield=True) | ||||||
|  | 
 | ||||||
|                 if not ctx_cs.cancel_called: |                 if not ctx_cs.cancel_called: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|                 # from .devx import pause |  | ||||||
|                 # await pause(shield=True) |  | ||||||
| 
 |  | ||||||
|                 log.cancel( |  | ||||||
|                     'IPC ctx was cancelled during "child" task sync due to\n\n' |  | ||||||
|                     f'{ctx.maybe_error}\n' |  | ||||||
|                 ) |  | ||||||
|                 # OW if the ctx's scope was cancelled manually, |                 # OW if the ctx's scope was cancelled manually, | ||||||
|                 # likely the `Context` was cancelled via a call to |                 # likely the `Context` was cancelled via a call to | ||||||
|                 # `._maybe_cancel_and_set_remote_error()` so ensure |                 # `._maybe_cancel_and_set_remote_error()` so ensure | ||||||
|  | @ -2272,13 +2390,16 @@ async def open_context_from_portal( | ||||||
|         match scope_err: |         match scope_err: | ||||||
|             case trio.Cancelled(): |             case trio.Cancelled(): | ||||||
|                 logmeth = log.cancel |                 logmeth = log.cancel | ||||||
|  |                 cause: str = 'cancelled' | ||||||
| 
 | 
 | ||||||
|             # XXX explicitly report on any non-graceful-taskc cases |             # XXX explicitly report on any non-graceful-taskc cases | ||||||
|             case _: |             case _: | ||||||
|  |                 cause: str = 'errored' | ||||||
|                 logmeth = log.exception |                 logmeth = log.exception | ||||||
| 
 | 
 | ||||||
|         logmeth( |         logmeth( | ||||||
|             f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' |             f'ctx {ctx.side!r}-side {cause!r} with,\n' | ||||||
|  |             f'{ctx.repr_outcome()!r}\n' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         if debug_mode(): |         if debug_mode(): | ||||||
|  | @ -2303,6 +2424,7 @@ async def open_context_from_portal( | ||||||
|         # told us it's cancelled ;p |         # told us it's cancelled ;p | ||||||
|         if ctxc_from_child is None: |         if ctxc_from_child is None: | ||||||
|             try: |             try: | ||||||
|  |                 # await pause(shield=True) | ||||||
|                 await ctx.cancel() |                 await ctx.cancel() | ||||||
|             except ( |             except ( | ||||||
|                 trio.BrokenResourceError, |                 trio.BrokenResourceError, | ||||||
|  | @ -2459,8 +2581,10 @@ async def open_context_from_portal( | ||||||
|                 log.cancel( |                 log.cancel( | ||||||
|                     f'Context cancelled by local {ctx.side!r}-side task\n' |                     f'Context cancelled by local {ctx.side!r}-side task\n' | ||||||
|                     f'c)>\n' |                     f'c)>\n' | ||||||
|                     f' |_{ctx._task}\n\n' |                     f'  |_{ctx.parent_task}\n' | ||||||
|                     f'{repr(scope_err)}\n' |                     f'   .cid={ctx.cid!r}\n' | ||||||
|  |                     f'\n' | ||||||
|  |                     f'{scope_err!r}\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             # TODO: should we add a `._cancel_req_received` |             # TODO: should we add a `._cancel_req_received` | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue