forked from goodboy/tractor
				
			Merge pull request #399 from goodboy/oob_cancel_testing
OoB (out-of-band) cancellation testing, proper.remotes/1757891924488435790/main
						commit
						83ce2275b9
					
				|  | @ -24,14 +24,10 @@ from tractor._testing import ( | |||
| ) | ||||
| 
 | ||||
| # XXX TODO cases: | ||||
| # - [ ] peer cancelled itself - so other peers should | ||||
| #   get errors reflecting that the peer was itself the .canceller? | ||||
| 
 | ||||
| # - [x] WE cancelled the peer and thus should not see any raised | ||||
| #   `ContextCancelled` as it should be reaped silently? | ||||
| #   => pretty sure `test_context_stream_semantics::test_caller_cancels()` | ||||
| #      already covers this case? | ||||
| 
 | ||||
| # - [x] INTER-PEER: some arbitrary remote peer cancels via | ||||
| #   Portal.cancel_actor(). | ||||
| #   => all other connected peers should get that cancel requesting peer's | ||||
|  | @ -44,16 +40,6 @@ from tractor._testing import ( | |||
| #   that also spawned a remote task task in that same peer-parent. | ||||
| 
 | ||||
| 
 | ||||
| # def test_self_cancel(): | ||||
| #     ''' | ||||
| #     2 cases: | ||||
| #     - calls `Actor.cancel()` locally in some task | ||||
| #     - calls LocalPortal.cancel_actor()` ? | ||||
| 
 | ||||
| #     ''' | ||||
| #     ... | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def open_stream_then_sleep_forever( | ||||
|     ctx: Context, | ||||
|  | @ -806,7 +792,7 @@ async def basic_echo_server( | |||
|     ctx: Context, | ||||
|     peer_name: str = 'wittle_bruv', | ||||
| 
 | ||||
|     err_after: int|None = None, | ||||
|     err_after_imsg: int|None = None, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|  | @ -835,8 +821,9 @@ async def basic_echo_server( | |||
|             await ipc.send(resp) | ||||
| 
 | ||||
|             if ( | ||||
|                 err_after | ||||
|                 and i > err_after | ||||
|                 err_after_imsg | ||||
|                 and | ||||
|                 i > err_after_imsg | ||||
|             ): | ||||
|                 raise RuntimeError( | ||||
|                     f'Simulated error in `{peer_name}`' | ||||
|  | @ -978,7 +965,8 @@ async def tell_little_bro( | |||
|     actor_name: str, | ||||
| 
 | ||||
|     caller: str = '', | ||||
|     err_after: int|None = None, | ||||
|     err_after: float|None = None, | ||||
|     rng_seed: int = 50, | ||||
| ): | ||||
|     # contact target actor, do a stream dialog. | ||||
|     async with ( | ||||
|  | @ -989,14 +977,18 @@ async def tell_little_bro( | |||
|             basic_echo_server, | ||||
| 
 | ||||
|             # XXX proxy any delayed err condition | ||||
|             err_after=err_after, | ||||
|             err_after_imsg=( | ||||
|                 err_after * rng_seed | ||||
|                 if err_after is not None | ||||
|                 else None | ||||
|             ), | ||||
|         ) as (sub_ctx, first), | ||||
| 
 | ||||
|         sub_ctx.open_stream() as echo_ipc, | ||||
|     ): | ||||
|         actor: Actor = current_actor() | ||||
|         uid: tuple = actor.uid | ||||
|         for i in range(100): | ||||
|         for i in range(rng_seed): | ||||
|             msg: tuple = ( | ||||
|                 uid, | ||||
|                 i, | ||||
|  | @ -1021,13 +1013,13 @@ async def tell_little_bro( | |||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'raise_sub_spawn_error_after', | ||||
|     [None, 50], | ||||
|     [None, 0.5], | ||||
| ) | ||||
| def test_peer_spawns_and_cancels_service_subactor( | ||||
|     debug_mode: bool, | ||||
|     raise_client_error: str, | ||||
|     reg_addr: tuple[str, int], | ||||
|     raise_sub_spawn_error_after: int|None, | ||||
|     raise_sub_spawn_error_after: float|None, | ||||
| ): | ||||
|     # NOTE: this tests for the modden `mod wks open piker` bug | ||||
|     # discovered as part of implementing workspace ctx | ||||
|  | @ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|     #   and the server's spawned child should cancel and terminate! | ||||
|     peer_name: str = 'little_bro' | ||||
| 
 | ||||
| 
 | ||||
|     def check_inner_rte(rae: RemoteActorError): | ||||
|         ''' | ||||
|         Validate the little_bro's relayed inception! | ||||
|  | @ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         ) | ||||
| 
 | ||||
|                     try: | ||||
|                         res = await client_ctx.result(hide_tb=False) | ||||
| 
 | ||||
|                         res = await client_ctx.wait_for_result(hide_tb=False) | ||||
|                         # in remote (relayed inception) error | ||||
|                         # case, we should error on the line above! | ||||
|                         if raise_sub_spawn_error_after: | ||||
|  | @ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         assert isinstance(res, ContextCancelled) | ||||
|                         assert client_ctx.cancel_acked | ||||
|                         assert res.canceller == root.uid | ||||
|                         assert not raise_sub_spawn_error_after | ||||
| 
 | ||||
|                         # cancelling the spawner sub should | ||||
|                         # transitively cancel it's sub, the little | ||||
|                         # bruv. | ||||
|                         print('root cancelling server/client sub-actors') | ||||
|                         await spawn_ctx.cancel() | ||||
|                         async with tractor.find_actor( | ||||
|                             name=peer_name, | ||||
|                         ) as sub: | ||||
|                             assert not sub | ||||
| 
 | ||||
|                     # XXX, only for tracing | ||||
|                     # except BaseException as _berr: | ||||
|                     #     berr = _berr | ||||
|                     #     await tractor.pause(shield=True) | ||||
|                     #     raise berr | ||||
| 
 | ||||
|                     except RemoteActorError as rae: | ||||
|                         _err = rae | ||||
|  | @ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         raise | ||||
|                         # await tractor.pause() | ||||
| 
 | ||||
|                     else: | ||||
|                         assert not raise_sub_spawn_error_after | ||||
| 
 | ||||
|                         # cancelling the spawner sub should | ||||
|                         # transitively cancel it's sub, the little | ||||
|                         # bruv. | ||||
|                         print('root cancelling server/client sub-actors') | ||||
|                         await spawn_ctx.cancel() | ||||
|                         async with tractor.find_actor( | ||||
|                             name=peer_name, | ||||
|                         ) as sub: | ||||
|                             assert not sub | ||||
| 
 | ||||
|                     # await tractor.pause() | ||||
|                     # await server.cancel_actor() | ||||
| 
 | ||||
|             except RemoteActorError as rae: | ||||
|  | @ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
| 
 | ||||
|             # since we called `.cancel_actor()`, `.cancel_ack` | ||||
|             # will not be set on the ctx bc `ctx.cancel()` was not | ||||
|             # called directly fot this confext. | ||||
|             # called directly for this confext. | ||||
|             except ContextCancelled as ctxc: | ||||
|                 _ctxc = ctxc | ||||
|                 print( | ||||
|  | @ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
| 
 | ||||
|                 # assert spawn_ctx.cancelled_caught | ||||
| 
 | ||||
|     async def _main(): | ||||
|         with trio.fail_after( | ||||
|             3 if not debug_mode | ||||
|             else 999 | ||||
|         ): | ||||
|             await main() | ||||
| 
 | ||||
|     if raise_sub_spawn_error_after: | ||||
|         with pytest.raises(RemoteActorError) as excinfo: | ||||
|             trio.run(main) | ||||
|             trio.run(_main) | ||||
| 
 | ||||
|         rae: RemoteActorError = excinfo.value | ||||
|         check_inner_rte(rae) | ||||
| 
 | ||||
|     else: | ||||
|         trio.run(main) | ||||
|         trio.run(_main) | ||||
|  |  | |||
|  | @ -0,0 +1,239 @@ | |||
| ''' | ||||
| Define the details of inter-actor "out-of-band" (OoB) cancel | ||||
| semantics, that is how cancellation works when a cancel request comes | ||||
| from the different concurrency (primitive's) "layer" then where the | ||||
| eventual `trio.Task` actually raises a signal. | ||||
| 
 | ||||
| ''' | ||||
| from functools import partial | ||||
| # from contextlib import asynccontextmanager as acm | ||||
| # import itertools | ||||
| 
 | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
| from tractor import (  # typing | ||||
|     ActorNursery, | ||||
|     Portal, | ||||
|     Context, | ||||
|     # ContextCancelled, | ||||
|     # RemoteActorError, | ||||
| ) | ||||
| # from tractor._testing import ( | ||||
| #     tractor_test, | ||||
| #     expect_ctxc, | ||||
| # ) | ||||
| 
 | ||||
| # XXX TODO cases: | ||||
| # - [ ] peer cancelled itself - so other peers should | ||||
| #   get errors reflecting that the peer was itself the .canceller? | ||||
| 
 | ||||
| # def test_self_cancel(): | ||||
| #     ''' | ||||
| #     2 cases: | ||||
| #     - calls `Actor.cancel()` locally in some task | ||||
| #     - calls LocalPortal.cancel_actor()` ? | ||||
| # | ||||
| # things to ensure! | ||||
| # -[ ] the ctxc raised in a child should ideally show the tb of the | ||||
| #     underlying `Cancelled` checkpoint, i.e. | ||||
| #     `raise scope_error from ctxc`? | ||||
| # | ||||
| # -[ ] a self-cancelled context, if not allowed to block on | ||||
| #     `ctx.result()` at some point will hang since the `ctx._scope` | ||||
| #     is never `.cancel_called`; cases for this include, | ||||
| #     - an `open_ctx()` which never starteds before being OoB actor | ||||
| #       cancelled. | ||||
| #       |_ parent task will be blocked in `.open_context()` for the | ||||
| #         `Started` msg, and when the OoB ctxc arrives `ctx._scope` | ||||
| #         will never have been signalled.. | ||||
| 
 | ||||
| #     ''' | ||||
| #     ... | ||||
| 
 | ||||
| # TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py` | ||||
| # but with the `Lock.acquire()` from a `@context` to ensure the | ||||
| # implicit ignore-case-non-unmasking. | ||||
| # | ||||
| # @tractor.context | ||||
| # async def acquire_actor_global_lock( | ||||
| #     ctx: tractor.Context, | ||||
| #     ignore_special_cases: bool, | ||||
| # ): | ||||
| 
 | ||||
| #     async with maybe_unmask_excs( | ||||
| #         ignore_special_cases=ignore_special_cases, | ||||
| #     ): | ||||
| #         await ctx.started('locked') | ||||
| 
 | ||||
| #     # block til cancelled | ||||
| #     await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def sleep_forever( | ||||
|     ctx: tractor.Context, | ||||
|     # ignore_special_cases: bool, | ||||
|     do_started: bool, | ||||
| ): | ||||
| 
 | ||||
|     # async with maybe_unmask_excs( | ||||
|     #     ignore_special_cases=ignore_special_cases, | ||||
|     # ): | ||||
|     #     await ctx.started('locked') | ||||
|     if do_started: | ||||
|         await ctx.started() | ||||
| 
 | ||||
|     # block til cancelled | ||||
|     print('sleepin on child-side..') | ||||
|     await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'cancel_ctx', | ||||
|     [True, False], | ||||
| ) | ||||
| def test_cancel_ctx_with_parent_side_entered_in_bg_task( | ||||
|     debug_mode: bool, | ||||
|     loglevel: str, | ||||
|     cancel_ctx: bool, | ||||
| ): | ||||
|     ''' | ||||
|     The most "basic" out-of-band-task self-cancellation case where | ||||
|     `Portal.open_context()` is entered in a bg task and the | ||||
|     parent-task (of the containing nursery) calls `Context.cancel()` | ||||
|     without the child knowing; the `Context._scope` should be | ||||
|     `.cancel_called` when the IPC ctx's child-side relays | ||||
|     a `ContextCancelled` with a `.canceller` set to the parent | ||||
|     actor('s task). | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         with trio.fail_after( | ||||
|             2 if not debug_mode else 999, | ||||
|         ): | ||||
|             an: ActorNursery | ||||
|             async with ( | ||||
|                 tractor.open_nursery( | ||||
|                     debug_mode=debug_mode, | ||||
|                     loglevel='devx', | ||||
|                     enable_stack_on_sig=True, | ||||
|                 ) as an, | ||||
|                 trio.open_nursery() as tn, | ||||
|             ): | ||||
|                 ptl: Portal = await an.start_actor( | ||||
|                     'sub', | ||||
|                     enable_modules=[__name__], | ||||
|                 ) | ||||
| 
 | ||||
|                 async def _open_ctx_async( | ||||
|                     do_started: bool = True, | ||||
|                     task_status=trio.TASK_STATUS_IGNORED, | ||||
|                 ): | ||||
|                     # do we expect to never enter the | ||||
|                     # `.open_context()` below. | ||||
|                     if not do_started: | ||||
|                         task_status.started() | ||||
| 
 | ||||
|                     async with ptl.open_context( | ||||
|                         sleep_forever, | ||||
|                         do_started=do_started, | ||||
|                     ) as (ctx, first): | ||||
|                         task_status.started(ctx) | ||||
|                         await trio.sleep_forever() | ||||
| 
 | ||||
|                 # XXX, this is the key OoB part! | ||||
|                 # | ||||
|                 # - start the `.open_context()` in a bg task which | ||||
|                 #   blocks inside the embedded scope-body, | ||||
|                 # | ||||
|                 # -  when we call `Context.cancel()` it **is | ||||
|                 #   not** from the same task which eventually runs | ||||
|                 #   `.__aexit__()`, | ||||
|                 # | ||||
|                 # - since the bg "opener" task will be in | ||||
|                 #   a `trio.sleep_forever()`, it must be interrupted | ||||
|                 #   by the `ContextCancelled` delivered from the | ||||
|                 #   child-side; `Context._scope: CancelScope` MUST | ||||
|                 #   be `.cancel_called`! | ||||
|                 # | ||||
|                 print('ASYNC opening IPC context in subtask..') | ||||
|                 maybe_ctx: Context|None = await tn.start(partial( | ||||
|                     _open_ctx_async, | ||||
|                 )) | ||||
| 
 | ||||
|                 if ( | ||||
|                     maybe_ctx | ||||
|                     and | ||||
|                     cancel_ctx | ||||
|                 ): | ||||
|                     print('cancelling first IPC ctx!') | ||||
|                     await maybe_ctx.cancel() | ||||
| 
 | ||||
|                 # XXX, note that despite `maybe_context.cancel()` | ||||
|                 # being called above, it's the parent (bg) task | ||||
|                 # which was originally never interrupted in | ||||
|                 # the `ctx._scope` body due to missing case logic in | ||||
|                 # `ctx._maybe_cancel_and_set_remote_error()`. | ||||
|                 # | ||||
|                 # It didn't matter that the subactor process was | ||||
|                 # already terminated and reaped, nothing was | ||||
|                 # cancelling the ctx-parent task's scope! | ||||
|                 # | ||||
|                 print('cancelling subactor!') | ||||
|                 await ptl.cancel_actor() | ||||
| 
 | ||||
|                 if maybe_ctx: | ||||
|                     try: | ||||
|                         await maybe_ctx.wait_for_result() | ||||
|                     except tractor.ContextCancelled as ctxc: | ||||
|                         assert not cancel_ctx | ||||
|                         assert ( | ||||
|                             ctxc.canceller | ||||
|                             == | ||||
|                             tractor.current_actor().aid.uid | ||||
|                         ) | ||||
|                         # don't re-raise since it'll trigger | ||||
|                         # an EG from the above tn. | ||||
| 
 | ||||
|     if cancel_ctx: | ||||
|         # graceful self-cancel | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     else: | ||||
|         # ctx parent task should see OoB ctxc due to | ||||
|         # `ptl.cancel_actor()`. | ||||
|         with pytest.raises(tractor.ContextCancelled) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         assert 'root' in excinfo.value.canceller[0] | ||||
| 
 | ||||
| 
 | ||||
| # def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it( | ||||
| #     debug_mode: bool, | ||||
| #     loglevel: str, | ||||
| # ): | ||||
| #     ''' | ||||
| #     Demos OoB cancellation from the perspective of a ctx opened with | ||||
| #     a child subactor where the parent cancels the child at the "actor | ||||
| #     layer" using `Portal.cancel_actor()` and thus the | ||||
| #     `ContextCancelled.canceller` received by the ctx's parent-side | ||||
| #     task will appear to be a "self cancellation" even though that | ||||
| #     specific task itself was not cancelled and thus | ||||
| #     `Context.cancel_called ==False`. | ||||
| #     ''' | ||||
|                 # TODO, do we have an existing implied ctx | ||||
|                 # cancel test like this? | ||||
|                 # with trio.move_on_after(0.5):# as cs: | ||||
|                 #     await _open_ctx_async( | ||||
|                 #         do_started=False, | ||||
|                 #     ) | ||||
| 
 | ||||
| 
 | ||||
|                 # in-line ctx scope should definitely raise | ||||
|                 # a ctxc with `.canceller = 'root'` | ||||
|                 # async with ptl.open_context( | ||||
|                 #     sleep_forever, | ||||
|                 #     do_started=True, | ||||
|                 # ) as pair: | ||||
| 
 | ||||
|  | @ -442,25 +442,25 @@ class Context: | |||
|         ''' | ||||
|         Records whether cancellation has been requested for this context | ||||
|         by a call to  `.cancel()` either due to, | ||||
|         - either an explicit call by some local task, | ||||
|         - an explicit call by some local task, | ||||
|         - or an implicit call due to an error caught inside | ||||
|           the ``Portal.open_context()`` block. | ||||
|           the `Portal.open_context()` block. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._cancel_called | ||||
| 
 | ||||
|     @cancel_called.setter | ||||
|     def cancel_called(self, val: bool) -> None: | ||||
|         ''' | ||||
|         Set the self-cancelled request `bool` value. | ||||
|     # XXX, to debug who frickin sets it.. | ||||
|     # @cancel_called.setter | ||||
|     # def cancel_called(self, val: bool) -> None: | ||||
|     #     ''' | ||||
|     #     Set the self-cancelled request `bool` value. | ||||
| 
 | ||||
|         ''' | ||||
|         # to debug who frickin sets it.. | ||||
|         # if val: | ||||
|         #     from .devx import pause_from_sync | ||||
|         #     pause_from_sync() | ||||
|     #     ''' | ||||
|     #     if val: | ||||
|     #         from .devx import pause_from_sync | ||||
|     #         pause_from_sync() | ||||
| 
 | ||||
|         self._cancel_called = val | ||||
|     #     self._cancel_called = val | ||||
| 
 | ||||
|     @property | ||||
|     def canceller(self) -> tuple[str, str]|None: | ||||
|  | @ -635,6 +635,71 @@ class Context: | |||
|         ''' | ||||
|         await self.chan.send(Stop(cid=self.cid)) | ||||
| 
 | ||||
|     @property | ||||
|     def parent_task(self) -> trio.Task: | ||||
|         ''' | ||||
|         This IPC context's "owning task" which is a `trio.Task` | ||||
|         on one of the "sides" of the IPC. | ||||
| 
 | ||||
|         Note that the "parent_" prefix here refers to the local | ||||
|         `trio` task tree using the same interface as | ||||
|         `trio.Nursery.parent_task` whereas for IPC contexts, | ||||
|         a different cross-actor task hierarchy exists: | ||||
| 
 | ||||
|         - a "parent"-side which originally entered | ||||
|           `Portal.open_context()`, | ||||
| 
 | ||||
|         - the "child"-side which was spawned and scheduled to invoke | ||||
|           a function decorated with `@tractor.context`. | ||||
| 
 | ||||
|         This task is thus a handle to mem-domain-distinct/per-process | ||||
|         `Nursery.parent_task` depending on in which of the above | ||||
|         "sides" this context exists. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._task | ||||
| 
 | ||||
|     def _is_blocked_on_rx_chan(self) -> bool: | ||||
|         ''' | ||||
|         Predicate to indicate whether the owner `._task: trio.Task` is | ||||
|         currently blocked (by `.receive()`-ing) on its underlying RPC | ||||
|         feeder `._rx_chan`. | ||||
| 
 | ||||
|         This knowledge is highly useful when handling so called | ||||
|         "out-of-band" (OoB) cancellation conditions where a peer | ||||
|         actor's task transmitted some remote error/cancel-msg and we | ||||
|         must know whether to signal-via-cancel currently executing | ||||
|         "user-code" (user defined code embedded in `ctx._scope`) or | ||||
|         simply to forward the IPC-msg-as-error **without calling** | ||||
|         `._scope.cancel()`. | ||||
| 
 | ||||
|         In the latter case it is presumed that if the owner task is | ||||
|         blocking for the next IPC msg, it will eventually receive, | ||||
|         process and raise the equivalent local error **without** | ||||
|         requiring `._scope.cancel()` to be explicitly called by the | ||||
|         *delivering OoB RPC-task* (via `_deliver_msg()`). | ||||
| 
 | ||||
|         ''' | ||||
|         # NOTE, see the mem-chan meth-impls for *why* this | ||||
|         # logic works, | ||||
|         # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` | ||||
|         # | ||||
|         # XXX realize that this is NOT an | ||||
|         # official/will-be-loudly-deprecated API: | ||||
|         # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data | ||||
|         #  |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled | ||||
|         # | ||||
|         # orig repo intro in the mem-chan change over patch: | ||||
|         # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 | ||||
|         #  |_https://github.com/python-trio/trio/pull/616 | ||||
|         #  |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 | ||||
|         # | ||||
|         return ( | ||||
|             self._task.custom_sleep_data | ||||
|             is | ||||
|             self._rx_chan | ||||
|         ) | ||||
| 
 | ||||
|     def _maybe_cancel_and_set_remote_error( | ||||
|         self, | ||||
|         error: BaseException, | ||||
|  | @ -787,13 +852,27 @@ class Context: | |||
|         if self._canceller is None: | ||||
|             log.error('Ctx has no canceller set!?') | ||||
| 
 | ||||
|         cs: trio.CancelScope = self._scope | ||||
| 
 | ||||
|         # ?TODO? see comment @ .start_remote_task()` | ||||
|         # | ||||
|         # if not cs: | ||||
|         #     from .devx import mk_pdb | ||||
|         #     mk_pdb().set_trace() | ||||
|         #     raise RuntimeError( | ||||
|         #         f'IPC ctx was not be opened prior to remote error delivery !?\n' | ||||
|         #         f'{self}\n' | ||||
|         #         f'\n' | ||||
|         #         f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' | ||||
|         #     ) | ||||
| 
 | ||||
|         # Cancel the local `._scope`, catch that | ||||
|         # `._scope.cancelled_caught` and re-raise any remote error | ||||
|         # once exiting (or manually calling `.wait_for_result()`) the | ||||
|         # `.open_context()`  block. | ||||
|         cs: trio.CancelScope = self._scope | ||||
|         if ( | ||||
|             cs | ||||
|             and not cs.cancel_called | ||||
| 
 | ||||
|             # XXX this is an expected cancel request response | ||||
|             # message and we **don't need to raise it** in the | ||||
|  | @ -802,8 +881,7 @@ class Context: | |||
|             # if `._cancel_called` then `.cancel_acked and .cancel_called` | ||||
|             # always should be set. | ||||
|             and not self._is_self_cancelled() | ||||
|             and not cs.cancel_called | ||||
|             and not cs.cancelled_caught | ||||
|             # and not cs.cancelled_caught | ||||
|         ): | ||||
|             if ( | ||||
|                 msgerr | ||||
|  | @ -814,7 +892,7 @@ class Context: | |||
|                 not self._cancel_on_msgerr | ||||
|             ): | ||||
|                 message: str = ( | ||||
|                     'NOT Cancelling `Context._scope` since,\n' | ||||
|                     f'NOT Cancelling `Context._scope` since,\n' | ||||
|                     f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' | ||||
|                     f'AND we got a msg-type-error!\n' | ||||
|                     f'{error}\n' | ||||
|  | @ -824,13 +902,43 @@ class Context: | |||
|                 # `trio.Cancelled` subtype here ;) | ||||
|                 # https://github.com/goodboy/tractor/issues/368 | ||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||
|                 # from .devx import pause_from_sync | ||||
|                 # pause_from_sync() | ||||
|                 self._scope.cancel() | ||||
|         else: | ||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||
|                 cs.cancel() | ||||
| 
 | ||||
|         # TODO, explicit condition for OoB (self-)cancellation? | ||||
|         # - we called `Portal.cancel_actor()` from this actor | ||||
|         #   and the peer ctx task delivered ctxc due to it. | ||||
|         # - currently `self._is_self_cancelled()` will be true | ||||
|         #   since the ctxc.canceller check will match us even though it | ||||
|         #   wasn't from this ctx specifically! | ||||
|         elif ( | ||||
|             cs | ||||
|             and self._is_self_cancelled() | ||||
|             and not cs.cancel_called | ||||
|         ): | ||||
|             message: str = ( | ||||
|                 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' | ||||
|                 '\n' | ||||
|             ) | ||||
|             # from .devx import mk_pdb | ||||
|             # mk_pdb().set_trace() | ||||
|             # TODO XXX, required to fix timeout failure in | ||||
|             # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` | ||||
|             # | ||||
| 
 | ||||
|             # XXX NOTE XXX, this is SUPER SUBTLE! | ||||
|             # we only want to cancel our embedded `._scope` | ||||
|             # if the ctx's current/using task is NOT blocked | ||||
|             # on `._rx_chan.receive()` and on some other | ||||
|             # `trio`-checkpoint since in the former case | ||||
|             # any `._remote_error` will be relayed through | ||||
|             # the rx-chan and appropriately raised by the owning | ||||
|             # `._task` directly. IF the owner task is however | ||||
|             # blocking elsewhere we need to interrupt it **now**. | ||||
|             if not self._is_blocked_on_rx_chan(): | ||||
|                 cs.cancel() | ||||
|         else: | ||||
|             # rx_stats = self._rx_chan.statistics() | ||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||
| 
 | ||||
|         fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' | ||||
|         if ( | ||||
|  | @ -854,6 +962,7 @@ class Context: | |||
|                 + | ||||
|                 cs_fmt | ||||
|             ) | ||||
| 
 | ||||
|         log.cancel( | ||||
|             message | ||||
|             + | ||||
|  | @ -946,8 +1055,9 @@ class Context: | |||
| 
 | ||||
|         ''' | ||||
|         side: str = self.side | ||||
|         # XXX for debug via the `@.setter` | ||||
|         self.cancel_called = True | ||||
|         self._cancel_called = True | ||||
|         # ^ XXX for debug via the `@.setter` | ||||
|         # self.cancel_called = True | ||||
| 
 | ||||
|         header: str = ( | ||||
|             f'Cancelling ctx from {side!r}-side\n' | ||||
|  | @ -2011,6 +2121,9 @@ async def open_context_from_portal( | |||
|             f'|_{portal.actor}\n' | ||||
|         ) | ||||
| 
 | ||||
|     # ?TODO? could we move this to inside the `tn` block? | ||||
|     # -> would allow doing `ctx.parent_task = tn.parent_task` ? | ||||
|     # -> would allow a `if not ._scope: => raise RTE` ? | ||||
|     ctx: Context = await portal.actor.start_remote_task( | ||||
|         portal.channel, | ||||
|         nsf=nsf, | ||||
|  | @ -2037,6 +2150,7 @@ async def open_context_from_portal( | |||
|     scope_err: BaseException|None = None | ||||
|     ctxc_from_child: ContextCancelled|None = None | ||||
|     try: | ||||
|         # from .devx import pause | ||||
|         async with ( | ||||
|             collapse_eg(), | ||||
|             trio.open_nursery() as tn, | ||||
|  | @ -2059,6 +2173,10 @@ async def open_context_from_portal( | |||
|             # the dialog, the `Error` msg should be raised from the `msg` | ||||
|             # handling block below. | ||||
|             try: | ||||
|                 log.runtime( | ||||
|                     f'IPC ctx parent waiting on Started msg..\n' | ||||
|                     f'ctx.cid: {ctx.cid!r}\n' | ||||
|                 ) | ||||
|                 started_msg, first = await ctx._pld_rx.recv_msg( | ||||
|                     ipc=ctx, | ||||
|                     expect_msg=Started, | ||||
|  | @ -2067,16 +2185,16 @@ async def open_context_from_portal( | |||
|                 ) | ||||
|             except trio.Cancelled as taskc: | ||||
|                 ctx_cs: trio.CancelScope = ctx._scope | ||||
|                 log.cancel( | ||||
|                     f'IPC ctx was cancelled during "child" task sync due to\n\n' | ||||
|                     f'.cid: {ctx.cid!r}\n' | ||||
|                     f'.maybe_error: {ctx.maybe_error!r}\n' | ||||
|                 ) | ||||
|                 # await pause(shield=True) | ||||
| 
 | ||||
|                 if not ctx_cs.cancel_called: | ||||
|                     raise | ||||
| 
 | ||||
|                 # from .devx import pause | ||||
|                 # await pause(shield=True) | ||||
| 
 | ||||
|                 log.cancel( | ||||
|                     'IPC ctx was cancelled during "child" task sync due to\n\n' | ||||
|                     f'{ctx.maybe_error}\n' | ||||
|                 ) | ||||
|                 # OW if the ctx's scope was cancelled manually, | ||||
|                 # likely the `Context` was cancelled via a call to | ||||
|                 # `._maybe_cancel_and_set_remote_error()` so ensure | ||||
|  | @ -2199,7 +2317,7 @@ async def open_context_from_portal( | |||
|         #   documenting it as a definittive example of | ||||
|         #   debugging the tractor-runtime itself using it's | ||||
|         #   own `.devx.` tooling! | ||||
|         #  | ||||
|         # | ||||
|         # await debug.pause() | ||||
| 
 | ||||
|         # CASE 2: context was cancelled by local task calling | ||||
|  | @ -2272,13 +2390,16 @@ async def open_context_from_portal( | |||
|         match scope_err: | ||||
|             case trio.Cancelled(): | ||||
|                 logmeth = log.cancel | ||||
|                 cause: str = 'cancelled' | ||||
| 
 | ||||
|             # XXX explicitly report on any non-graceful-taskc cases | ||||
|             case _: | ||||
|                 cause: str = 'errored' | ||||
|                 logmeth = log.exception | ||||
| 
 | ||||
|         logmeth( | ||||
|             f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' | ||||
|             f'ctx {ctx.side!r}-side {cause!r} with,\n' | ||||
|             f'{ctx.repr_outcome()!r}\n' | ||||
|         ) | ||||
| 
 | ||||
|         if debug_mode(): | ||||
|  | @ -2303,6 +2424,7 @@ async def open_context_from_portal( | |||
|         # told us it's cancelled ;p | ||||
|         if ctxc_from_child is None: | ||||
|             try: | ||||
|                 # await pause(shield=True) | ||||
|                 await ctx.cancel() | ||||
|             except ( | ||||
|                 trio.BrokenResourceError, | ||||
|  | @ -2459,8 +2581,10 @@ async def open_context_from_portal( | |||
|                 log.cancel( | ||||
|                     f'Context cancelled by local {ctx.side!r}-side task\n' | ||||
|                     f'c)>\n' | ||||
|                     f' |_{ctx._task}\n\n' | ||||
|                     f'{repr(scope_err)}\n' | ||||
|                     f'  |_{ctx.parent_task}\n' | ||||
|                     f'   .cid={ctx.cid!r}\n' | ||||
|                     f'\n' | ||||
|                     f'{scope_err!r}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # TODO: should we add a `._cancel_req_received` | ||||
|  |  | |||
|  | @ -446,12 +446,12 @@ class ActorNursery: | |||
| @acm | ||||
| async def _open_and_supervise_one_cancels_all_nursery( | ||||
|     actor: Actor, | ||||
|     tb_hide: bool = False, | ||||
|     hide_tb: bool = True, | ||||
| 
 | ||||
| ) -> typing.AsyncGenerator[ActorNursery, None]: | ||||
| 
 | ||||
|     # normally don't need to show user by default | ||||
|     __tracebackhide__: bool = tb_hide | ||||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     outer_err: BaseException|None = None | ||||
|     inner_err: BaseException|None = None | ||||
|  |  | |||
|  | @ -613,10 +613,9 @@ async def drain_to_final_msg( | |||
|             #       msg: dict = await ctx._rx_chan.receive() | ||||
|             #   if res_cs.cancelled_caught: | ||||
|             # | ||||
|             # -[ ] make sure pause points work here for REPLing | ||||
|             # -[x] make sure pause points work here for REPLing | ||||
|             #   the runtime itself; i.e. ensure there's no hangs! | ||||
|             # |_from tractor.devx.debug import pause | ||||
|             #   await pause() | ||||
|             #  |_see masked code below in .cancel_called path | ||||
| 
 | ||||
|         # NOTE: we get here if the far end was | ||||
|         # `ContextCancelled` in 2 cases: | ||||
|  | @ -652,6 +651,10 @@ async def drain_to_final_msg( | |||
|                     f'IPC ctx cancelled externally during result drain ?\n' | ||||
|                     f'{ctx}' | ||||
|                 ) | ||||
|                 # XXX, for tracing `Cancelled`.. | ||||
|                 # from tractor.devx.debug import pause | ||||
|                 # await pause(shield=True) | ||||
| 
 | ||||
|             # CASE 2: mask the local cancelled-error(s) | ||||
|             # only when we are sure the remote error is | ||||
|             # the source cause of this local task's | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue