Compare commits
	
		
			14 Commits 
		
	
	
		
			653f23a04c
			...
			23809b8468
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 23809b8468 | |
|  | 60427329ee | |
|  | f946041d44 | |
|  | a4339d6ac6 | |
|  | 9123fbdbfa | |
|  | e7b3254b7b | |
|  | e468f62c26 | |
|  | 6c65729c20 | |
|  | 94fbbe0b05 | |
|  | d5b54f3f5e | |
|  | fd314deecb | |
|  | dd011c0b2f | |
|  | 087aaa1c36 | |
|  | 66b7410eab | 
|  | @ -1,85 +0,0 @@ | ||||||
| from contextlib import ( |  | ||||||
|     asynccontextmanager as acm, |  | ||||||
| ) |  | ||||||
| from functools import partial |  | ||||||
| 
 |  | ||||||
| import tractor |  | ||||||
| import trio |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| log = tractor.log.get_logger( |  | ||||||
|     name=__name__ |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| _lock: trio.Lock|None = None |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @acm |  | ||||||
| async def acquire_singleton_lock( |  | ||||||
| ) -> None: |  | ||||||
|     global _lock |  | ||||||
|     if _lock is None: |  | ||||||
|         log.info('Allocating LOCK') |  | ||||||
|         _lock = trio.Lock() |  | ||||||
| 
 |  | ||||||
|     log.info('TRYING TO LOCK ACQUIRE') |  | ||||||
|     async with _lock: |  | ||||||
|         log.info('ACQUIRED') |  | ||||||
|         yield _lock |  | ||||||
| 
 |  | ||||||
|     log.info('RELEASED') |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def hold_lock_forever( |  | ||||||
|     task_status=trio.TASK_STATUS_IGNORED |  | ||||||
| ): |  | ||||||
|     async with ( |  | ||||||
|         tractor.trionics.maybe_raise_from_masking_exc(), |  | ||||||
|         acquire_singleton_lock() as lock, |  | ||||||
|     ): |  | ||||||
|         task_status.started(lock) |  | ||||||
|         await trio.sleep_forever() |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def main( |  | ||||||
|     ignore_special_cases: bool, |  | ||||||
|     loglevel: str = 'info', |  | ||||||
|     debug_mode: bool = True, |  | ||||||
| ): |  | ||||||
|     async with ( |  | ||||||
|         trio.open_nursery() as tn, |  | ||||||
| 
 |  | ||||||
|         # tractor.trionics.maybe_raise_from_masking_exc() |  | ||||||
|         # ^^^ XXX NOTE, interestingly putting the unmasker |  | ||||||
|         # here does not exhibit the same behaviour ?? |  | ||||||
|     ): |  | ||||||
|         if not ignore_special_cases: |  | ||||||
|             from tractor.trionics import _taskc |  | ||||||
|             _taskc._mask_cases.clear() |  | ||||||
| 
 |  | ||||||
|         _lock = await tn.start( |  | ||||||
|             hold_lock_forever, |  | ||||||
|         ) |  | ||||||
|         with trio.move_on_after(0.2): |  | ||||||
|             await tn.start( |  | ||||||
|                 hold_lock_forever, |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         tn.cancel_scope.cancel() |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # XXX, manual test as script |  | ||||||
| if __name__ == '__main__': |  | ||||||
|     tractor.log.get_console_log(level='info') |  | ||||||
|     for case in [True, False]: |  | ||||||
|         log.info( |  | ||||||
|             f'\n' |  | ||||||
|             f'------ RUNNING SCRIPT TRIAL ------\n' |  | ||||||
|             f'ignore_special_cases: {case!r}\n' |  | ||||||
|         ) |  | ||||||
|         trio.run(partial( |  | ||||||
|             main, |  | ||||||
|             ignore_special_cases=case, |  | ||||||
|             loglevel='info', |  | ||||||
|         )) |  | ||||||
|  | @ -1,195 +0,0 @@ | ||||||
| from contextlib import ( |  | ||||||
|     contextmanager as cm, |  | ||||||
|     # TODO, any diff in async case(s)?? |  | ||||||
|     # asynccontextmanager as acm, |  | ||||||
| ) |  | ||||||
| from functools import partial |  | ||||||
| 
 |  | ||||||
| import tractor |  | ||||||
| import trio |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| log = tractor.log.get_logger( |  | ||||||
|     name=__name__ |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @cm |  | ||||||
| def teardown_on_exc( |  | ||||||
|     raise_from_handler: bool = False, |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     You could also have a teardown handler which catches any exc and |  | ||||||
|     does some required teardown. In this case the problem is |  | ||||||
|     compounded UNLESS you ensure the handler's scope is OUTSIDE the |  | ||||||
|     `ux.aclose()`.. that is in the caller's enclosing scope. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     try: |  | ||||||
|         yield |  | ||||||
|     except BaseException as _berr: |  | ||||||
|         berr = _berr |  | ||||||
|         log.exception( |  | ||||||
|             f'Handling termination teardown in child due to,\n' |  | ||||||
|             f'{berr!r}\n' |  | ||||||
|         ) |  | ||||||
|         if raise_from_handler: |  | ||||||
|             # XXX teardown ops XXX |  | ||||||
|             # on termination these steps say need to be run to |  | ||||||
|             # ensure wider system consistency (like the state of |  | ||||||
|             # remote connections/services). |  | ||||||
|             # |  | ||||||
|             # HOWEVER, any bug in this teardown code is also |  | ||||||
|             # masked by the `tx.aclose()`! |  | ||||||
|             # this is also true if `_tn.cancel_scope` is |  | ||||||
|             # `.cancel_called` by the parent in a graceful |  | ||||||
|             # request case.. |  | ||||||
| 
 |  | ||||||
|             # simulate a bug in teardown handler. |  | ||||||
|             raise RuntimeError( |  | ||||||
|                 'woopsie teardown bug!' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         raise  # no teardown bug. |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def finite_stream_to_rent( |  | ||||||
|     tx: trio.abc.SendChannel, |  | ||||||
|     child_errors_mid_stream: bool, |  | ||||||
|     raise_unmasked: bool, |  | ||||||
| 
 |  | ||||||
|     task_status: trio.TaskStatus[ |  | ||||||
|         trio.CancelScope, |  | ||||||
|     ] = trio.TASK_STATUS_IGNORED, |  | ||||||
| ): |  | ||||||
|     async with ( |  | ||||||
|         # XXX without this unmasker the mid-streaming RTE is never |  | ||||||
|         # reported since it is masked by the `tx.aclose()` |  | ||||||
|         # call which in turn raises `Cancelled`! |  | ||||||
|         # |  | ||||||
|         # NOTE, this is WITHOUT doing any exception handling |  | ||||||
|         # inside the child  task! |  | ||||||
|         # |  | ||||||
|         # TODO, uncomment next LoC to see the supprsessed beg[RTE]! |  | ||||||
|         tractor.trionics.maybe_raise_from_masking_exc( |  | ||||||
|             raise_unmasked=raise_unmasked, |  | ||||||
|         ), |  | ||||||
| 
 |  | ||||||
|         tx as tx,  # .aclose() is the guilty masker chkpt! |  | ||||||
| 
 |  | ||||||
|         # XXX, this ONLY matters in the |  | ||||||
|         # `child_errors_mid_stream=False` case oddly!? |  | ||||||
|         # THAT IS, if no tn is opened in that case then the |  | ||||||
|         # test will not fail; it raises the RTE correctly? |  | ||||||
|         # |  | ||||||
|         # -> so it seems this new scope somehow affects the form of |  | ||||||
|         #    eventual in the parent EG? |  | ||||||
|         tractor.trionics.maybe_open_nursery( |  | ||||||
|             nursery=( |  | ||||||
|                 None |  | ||||||
|                 if not child_errors_mid_stream |  | ||||||
|                 else True |  | ||||||
|             ), |  | ||||||
|         ) as _tn, |  | ||||||
|     ): |  | ||||||
|         # pass our scope back to parent for supervision\ |  | ||||||
|         # control. |  | ||||||
|         cs: trio.CancelScope|None = ( |  | ||||||
|             None |  | ||||||
|             if _tn is True |  | ||||||
|             else _tn.cancel_scope |  | ||||||
|         ) |  | ||||||
|         task_status.started(cs) |  | ||||||
| 
 |  | ||||||
|         with teardown_on_exc( |  | ||||||
|             raise_from_handler=not child_errors_mid_stream, |  | ||||||
|         ): |  | ||||||
|             for i in range(100): |  | ||||||
|                 log.debug( |  | ||||||
|                     f'Child tx {i!r}\n' |  | ||||||
|                 ) |  | ||||||
|                 if ( |  | ||||||
|                     child_errors_mid_stream |  | ||||||
|                     and |  | ||||||
|                     i == 66 |  | ||||||
|                 ): |  | ||||||
|                     # oh wait but WOOPS there's a bug |  | ||||||
|                     # in that teardown code!? |  | ||||||
|                     raise RuntimeError( |  | ||||||
|                         'woopsie, a mid-streaming bug!?' |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|                 await tx.send(i) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def main( |  | ||||||
|     # TODO! toggle this for the 2 cases! |  | ||||||
|     # 1. child errors mid-stream while parent is also requesting |  | ||||||
|     #   (graceful) cancel of that child streamer. |  | ||||||
|     # |  | ||||||
|     # 2. child contains a teardown handler which contains a |  | ||||||
|     #   bug and raises. |  | ||||||
|     # |  | ||||||
|     child_errors_mid_stream: bool, |  | ||||||
| 
 |  | ||||||
|     raise_unmasked: bool = False, |  | ||||||
|     loglevel: str = 'info', |  | ||||||
| ): |  | ||||||
|     tractor.log.get_console_log(level=loglevel) |  | ||||||
| 
 |  | ||||||
|     # the `.aclose()` being checkpoints on these |  | ||||||
|     # is the source of the problem.. |  | ||||||
|     tx, rx = trio.open_memory_channel(1) |  | ||||||
| 
 |  | ||||||
|     async with ( |  | ||||||
|         tractor.trionics.collapse_eg(), |  | ||||||
|         trio.open_nursery() as tn, |  | ||||||
|         rx as rx, |  | ||||||
|     ): |  | ||||||
|         _child_cs = await tn.start( |  | ||||||
|             partial( |  | ||||||
|                 finite_stream_to_rent, |  | ||||||
|                 child_errors_mid_stream=child_errors_mid_stream, |  | ||||||
|                 raise_unmasked=raise_unmasked, |  | ||||||
|                 tx=tx, |  | ||||||
|             ) |  | ||||||
|         ) |  | ||||||
|         async for msg in rx: |  | ||||||
|             log.debug( |  | ||||||
|                 f'Rent rx {msg!r}\n' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             # simulate some external cancellation |  | ||||||
|             # request **JUST BEFORE** the child errors. |  | ||||||
|             if msg == 65: |  | ||||||
|                 log.cancel( |  | ||||||
|                     f'Cancelling parent on,\n' |  | ||||||
|                     f'msg={msg}\n' |  | ||||||
|                     f'\n' |  | ||||||
|                     f'Simulates OOB cancel request!\n' |  | ||||||
|                 ) |  | ||||||
|                 tn.cancel_scope.cancel() |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # XXX, manual test as script |  | ||||||
| if __name__ == '__main__': |  | ||||||
|     tractor.log.get_console_log(level='info') |  | ||||||
|     for case in [True, False]: |  | ||||||
|         log.info( |  | ||||||
|             f'\n' |  | ||||||
|             f'------ RUNNING SCRIPT TRIAL ------\n' |  | ||||||
|             f'child_errors_midstream: {case!r}\n' |  | ||||||
|         ) |  | ||||||
|         try: |  | ||||||
|             trio.run(partial( |  | ||||||
|                 main, |  | ||||||
|                 child_errors_mid_stream=case, |  | ||||||
|                 # raise_unmasked=True, |  | ||||||
|                 loglevel='info', |  | ||||||
|             )) |  | ||||||
|         except Exception as _exc: |  | ||||||
|             exc = _exc |  | ||||||
|             log.exception( |  | ||||||
|                 'Should have raised an RTE or Cancelled?\n' |  | ||||||
|             ) |  | ||||||
|             breakpoint() |  | ||||||
|  | @ -95,7 +95,6 @@ def run_example_in_subproc( | ||||||
|             and 'integration' not in p[0] |             and 'integration' not in p[0] | ||||||
|             and 'advanced_faults' not in p[0] |             and 'advanced_faults' not in p[0] | ||||||
|             and 'multihost' not in p[0] |             and 'multihost' not in p[0] | ||||||
|             and 'trio' not in p[0] |  | ||||||
|         ) |         ) | ||||||
|     ], |     ], | ||||||
|     ids=lambda t: t[1], |     ids=lambda t: t[1], | ||||||
|  |  | ||||||
|  | @ -24,10 +24,14 @@ from tractor._testing import ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| # XXX TODO cases: | # XXX TODO cases: | ||||||
|  | # - [ ] peer cancelled itself - so other peers should | ||||||
|  | #   get errors reflecting that the peer was itself the .canceller? | ||||||
|  | 
 | ||||||
| # - [x] WE cancelled the peer and thus should not see any raised | # - [x] WE cancelled the peer and thus should not see any raised | ||||||
| #   `ContextCancelled` as it should be reaped silently? | #   `ContextCancelled` as it should be reaped silently? | ||||||
| #   => pretty sure `test_context_stream_semantics::test_caller_cancels()` | #   => pretty sure `test_context_stream_semantics::test_caller_cancels()` | ||||||
| #      already covers this case? | #      already covers this case? | ||||||
|  | 
 | ||||||
| # - [x] INTER-PEER: some arbitrary remote peer cancels via | # - [x] INTER-PEER: some arbitrary remote peer cancels via | ||||||
| #   Portal.cancel_actor(). | #   Portal.cancel_actor(). | ||||||
| #   => all other connected peers should get that cancel requesting peer's | #   => all other connected peers should get that cancel requesting peer's | ||||||
|  | @ -40,6 +44,16 @@ from tractor._testing import ( | ||||||
| #   that also spawned a remote task task in that same peer-parent. | #   that also spawned a remote task task in that same peer-parent. | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # def test_self_cancel(): | ||||||
|  | #     ''' | ||||||
|  | #     2 cases: | ||||||
|  | #     - calls `Actor.cancel()` locally in some task | ||||||
|  | #     - calls LocalPortal.cancel_actor()` ? | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     ... | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
| async def open_stream_then_sleep_forever( | async def open_stream_then_sleep_forever( | ||||||
|     ctx: Context, |     ctx: Context, | ||||||
|  | @ -792,7 +806,7 @@ async def basic_echo_server( | ||||||
|     ctx: Context, |     ctx: Context, | ||||||
|     peer_name: str = 'wittle_bruv', |     peer_name: str = 'wittle_bruv', | ||||||
| 
 | 
 | ||||||
|     err_after_imsg: int|None = None, |     err_after: int|None = None, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|  | @ -821,9 +835,8 @@ async def basic_echo_server( | ||||||
|             await ipc.send(resp) |             await ipc.send(resp) | ||||||
| 
 | 
 | ||||||
|             if ( |             if ( | ||||||
|                 err_after_imsg |                 err_after | ||||||
|                 and |                 and i > err_after | ||||||
|                 i > err_after_imsg |  | ||||||
|             ): |             ): | ||||||
|                 raise RuntimeError( |                 raise RuntimeError( | ||||||
|                     f'Simulated error in `{peer_name}`' |                     f'Simulated error in `{peer_name}`' | ||||||
|  | @ -965,8 +978,7 @@ async def tell_little_bro( | ||||||
|     actor_name: str, |     actor_name: str, | ||||||
| 
 | 
 | ||||||
|     caller: str = '', |     caller: str = '', | ||||||
|     err_after: float|None = None, |     err_after: int|None = None, | ||||||
|     rng_seed: int = 50, |  | ||||||
| ): | ): | ||||||
|     # contact target actor, do a stream dialog. |     # contact target actor, do a stream dialog. | ||||||
|     async with ( |     async with ( | ||||||
|  | @ -977,18 +989,14 @@ async def tell_little_bro( | ||||||
|             basic_echo_server, |             basic_echo_server, | ||||||
| 
 | 
 | ||||||
|             # XXX proxy any delayed err condition |             # XXX proxy any delayed err condition | ||||||
|             err_after_imsg=( |             err_after=err_after, | ||||||
|                 err_after * rng_seed |  | ||||||
|                 if err_after is not None |  | ||||||
|                 else None |  | ||||||
|             ), |  | ||||||
|         ) as (sub_ctx, first), |         ) as (sub_ctx, first), | ||||||
| 
 | 
 | ||||||
|         sub_ctx.open_stream() as echo_ipc, |         sub_ctx.open_stream() as echo_ipc, | ||||||
|     ): |     ): | ||||||
|         actor: Actor = current_actor() |         actor: Actor = current_actor() | ||||||
|         uid: tuple = actor.uid |         uid: tuple = actor.uid | ||||||
|         for i in range(rng_seed): |         for i in range(100): | ||||||
|             msg: tuple = ( |             msg: tuple = ( | ||||||
|                 uid, |                 uid, | ||||||
|                 i, |                 i, | ||||||
|  | @ -1013,13 +1021,13 @@ async def tell_little_bro( | ||||||
| ) | ) | ||||||
| @pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||||
|     'raise_sub_spawn_error_after', |     'raise_sub_spawn_error_after', | ||||||
|     [None, 0.5], |     [None, 50], | ||||||
| ) | ) | ||||||
| def test_peer_spawns_and_cancels_service_subactor( | def test_peer_spawns_and_cancels_service_subactor( | ||||||
|     debug_mode: bool, |     debug_mode: bool, | ||||||
|     raise_client_error: str, |     raise_client_error: str, | ||||||
|     reg_addr: tuple[str, int], |     reg_addr: tuple[str, int], | ||||||
|     raise_sub_spawn_error_after: float|None, |     raise_sub_spawn_error_after: int|None, | ||||||
| ): | ): | ||||||
|     # NOTE: this tests for the modden `mod wks open piker` bug |     # NOTE: this tests for the modden `mod wks open piker` bug | ||||||
|     # discovered as part of implementing workspace ctx |     # discovered as part of implementing workspace ctx | ||||||
|  | @ -1033,7 +1041,6 @@ def test_peer_spawns_and_cancels_service_subactor( | ||||||
|     #   and the server's spawned child should cancel and terminate! |     #   and the server's spawned child should cancel and terminate! | ||||||
|     peer_name: str = 'little_bro' |     peer_name: str = 'little_bro' | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|     def check_inner_rte(rae: RemoteActorError): |     def check_inner_rte(rae: RemoteActorError): | ||||||
|         ''' |         ''' | ||||||
|         Validate the little_bro's relayed inception! |         Validate the little_bro's relayed inception! | ||||||
|  | @ -1127,7 +1134,8 @@ def test_peer_spawns_and_cancels_service_subactor( | ||||||
|                         ) |                         ) | ||||||
| 
 | 
 | ||||||
|                     try: |                     try: | ||||||
|                         res = await client_ctx.wait_for_result(hide_tb=False) |                         res = await client_ctx.result(hide_tb=False) | ||||||
|  | 
 | ||||||
|                         # in remote (relayed inception) error |                         # in remote (relayed inception) error | ||||||
|                         # case, we should error on the line above! |                         # case, we should error on the line above! | ||||||
|                         if raise_sub_spawn_error_after: |                         if raise_sub_spawn_error_after: | ||||||
|  | @ -1138,23 +1146,6 @@ def test_peer_spawns_and_cancels_service_subactor( | ||||||
|                         assert isinstance(res, ContextCancelled) |                         assert isinstance(res, ContextCancelled) | ||||||
|                         assert client_ctx.cancel_acked |                         assert client_ctx.cancel_acked | ||||||
|                         assert res.canceller == root.uid |                         assert res.canceller == root.uid | ||||||
|                         assert not raise_sub_spawn_error_after |  | ||||||
| 
 |  | ||||||
|                         # cancelling the spawner sub should |  | ||||||
|                         # transitively cancel it's sub, the little |  | ||||||
|                         # bruv. |  | ||||||
|                         print('root cancelling server/client sub-actors') |  | ||||||
|                         await spawn_ctx.cancel() |  | ||||||
|                         async with tractor.find_actor( |  | ||||||
|                             name=peer_name, |  | ||||||
|                         ) as sub: |  | ||||||
|                             assert not sub |  | ||||||
| 
 |  | ||||||
|                     # XXX, only for tracing |  | ||||||
|                     # except BaseException as _berr: |  | ||||||
|                     #     berr = _berr |  | ||||||
|                     #     await tractor.pause(shield=True) |  | ||||||
|                     #     raise berr |  | ||||||
| 
 | 
 | ||||||
|                     except RemoteActorError as rae: |                     except RemoteActorError as rae: | ||||||
|                         _err = rae |                         _err = rae | ||||||
|  | @ -1183,8 +1174,19 @@ def test_peer_spawns_and_cancels_service_subactor( | ||||||
|                         raise |                         raise | ||||||
|                         # await tractor.pause() |                         # await tractor.pause() | ||||||
| 
 | 
 | ||||||
|  |                     else: | ||||||
|  |                         assert not raise_sub_spawn_error_after | ||||||
|  | 
 | ||||||
|  |                         # cancelling the spawner sub should | ||||||
|  |                         # transitively cancel it's sub, the little | ||||||
|  |                         # bruv. | ||||||
|  |                         print('root cancelling server/client sub-actors') | ||||||
|  |                         await spawn_ctx.cancel() | ||||||
|  |                         async with tractor.find_actor( | ||||||
|  |                             name=peer_name, | ||||||
|  |                         ) as sub: | ||||||
|  |                             assert not sub | ||||||
| 
 | 
 | ||||||
|                     # await tractor.pause() |  | ||||||
|                     # await server.cancel_actor() |                     # await server.cancel_actor() | ||||||
| 
 | 
 | ||||||
|             except RemoteActorError as rae: |             except RemoteActorError as rae: | ||||||
|  | @ -1197,7 +1199,7 @@ def test_peer_spawns_and_cancels_service_subactor( | ||||||
| 
 | 
 | ||||||
|             # since we called `.cancel_actor()`, `.cancel_ack` |             # since we called `.cancel_actor()`, `.cancel_ack` | ||||||
|             # will not be set on the ctx bc `ctx.cancel()` was not |             # will not be set on the ctx bc `ctx.cancel()` was not | ||||||
|             # called directly for this confext. |             # called directly fot this confext. | ||||||
|             except ContextCancelled as ctxc: |             except ContextCancelled as ctxc: | ||||||
|                 _ctxc = ctxc |                 _ctxc = ctxc | ||||||
|                 print( |                 print( | ||||||
|  | @ -1237,19 +1239,12 @@ def test_peer_spawns_and_cancels_service_subactor( | ||||||
| 
 | 
 | ||||||
|                 # assert spawn_ctx.cancelled_caught |                 # assert spawn_ctx.cancelled_caught | ||||||
| 
 | 
 | ||||||
|     async def _main(): |  | ||||||
|         with trio.fail_after( |  | ||||||
|             3 if not debug_mode |  | ||||||
|             else 999 |  | ||||||
|         ): |  | ||||||
|             await main() |  | ||||||
| 
 |  | ||||||
|     if raise_sub_spawn_error_after: |     if raise_sub_spawn_error_after: | ||||||
|         with pytest.raises(RemoteActorError) as excinfo: |         with pytest.raises(RemoteActorError) as excinfo: | ||||||
|             trio.run(_main) |             trio.run(main) | ||||||
| 
 | 
 | ||||||
|         rae: RemoteActorError = excinfo.value |         rae: RemoteActorError = excinfo.value | ||||||
|         check_inner_rte(rae) |         check_inner_rte(rae) | ||||||
| 
 | 
 | ||||||
|     else: |     else: | ||||||
|         trio.run(_main) |         trio.run(main) | ||||||
|  |  | ||||||
|  | @ -1,239 +0,0 @@ | ||||||
| ''' |  | ||||||
| Define the details of inter-actor "out-of-band" (OoB) cancel |  | ||||||
| semantics, that is how cancellation works when a cancel request comes |  | ||||||
| from the different concurrency (primitive's) "layer" then where the |  | ||||||
| eventual `trio.Task` actually raises a signal. |  | ||||||
| 
 |  | ||||||
| ''' |  | ||||||
| from functools import partial |  | ||||||
| # from contextlib import asynccontextmanager as acm |  | ||||||
| # import itertools |  | ||||||
| 
 |  | ||||||
| import pytest |  | ||||||
| import trio |  | ||||||
| import tractor |  | ||||||
| from tractor import (  # typing |  | ||||||
|     ActorNursery, |  | ||||||
|     Portal, |  | ||||||
|     Context, |  | ||||||
|     # ContextCancelled, |  | ||||||
|     # RemoteActorError, |  | ||||||
| ) |  | ||||||
| # from tractor._testing import ( |  | ||||||
| #     tractor_test, |  | ||||||
| #     expect_ctxc, |  | ||||||
| # ) |  | ||||||
| 
 |  | ||||||
| # XXX TODO cases: |  | ||||||
| # - [ ] peer cancelled itself - so other peers should |  | ||||||
| #   get errors reflecting that the peer was itself the .canceller? |  | ||||||
| 
 |  | ||||||
| # def test_self_cancel(): |  | ||||||
| #     ''' |  | ||||||
| #     2 cases: |  | ||||||
| #     - calls `Actor.cancel()` locally in some task |  | ||||||
| #     - calls LocalPortal.cancel_actor()` ? |  | ||||||
| # |  | ||||||
| # things to ensure! |  | ||||||
| # -[ ] the ctxc raised in a child should ideally show the tb of the |  | ||||||
| #     underlying `Cancelled` checkpoint, i.e. |  | ||||||
| #     `raise scope_error from ctxc`? |  | ||||||
| # |  | ||||||
| # -[ ] a self-cancelled context, if not allowed to block on |  | ||||||
| #     `ctx.result()` at some point will hang since the `ctx._scope` |  | ||||||
| #     is never `.cancel_called`; cases for this include, |  | ||||||
| #     - an `open_ctx()` which never starteds before being OoB actor |  | ||||||
| #       cancelled. |  | ||||||
| #       |_ parent task will be blocked in `.open_context()` for the |  | ||||||
| #         `Started` msg, and when the OoB ctxc arrives `ctx._scope` |  | ||||||
| #         will never have been signalled.. |  | ||||||
| 
 |  | ||||||
| #     ''' |  | ||||||
| #     ... |  | ||||||
| 
 |  | ||||||
| # TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py` |  | ||||||
| # but with the `Lock.acquire()` from a `@context` to ensure the |  | ||||||
| # implicit ignore-case-non-unmasking. |  | ||||||
| # |  | ||||||
| # @tractor.context |  | ||||||
| # async def acquire_actor_global_lock( |  | ||||||
| #     ctx: tractor.Context, |  | ||||||
| #     ignore_special_cases: bool, |  | ||||||
| # ): |  | ||||||
| 
 |  | ||||||
| #     async with maybe_unmask_excs( |  | ||||||
| #         ignore_special_cases=ignore_special_cases, |  | ||||||
| #     ): |  | ||||||
| #         await ctx.started('locked') |  | ||||||
| 
 |  | ||||||
| #     # block til cancelled |  | ||||||
| #     await trio.sleep_forever() |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @tractor.context |  | ||||||
| async def sleep_forever( |  | ||||||
|     ctx: tractor.Context, |  | ||||||
|     # ignore_special_cases: bool, |  | ||||||
|     do_started: bool, |  | ||||||
| ): |  | ||||||
| 
 |  | ||||||
|     # async with maybe_unmask_excs( |  | ||||||
|     #     ignore_special_cases=ignore_special_cases, |  | ||||||
|     # ): |  | ||||||
|     #     await ctx.started('locked') |  | ||||||
|     if do_started: |  | ||||||
|         await ctx.started() |  | ||||||
| 
 |  | ||||||
|     # block til cancelled |  | ||||||
|     print('sleepin on child-side..') |  | ||||||
|     await trio.sleep_forever() |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'cancel_ctx', |  | ||||||
|     [True, False], |  | ||||||
| ) |  | ||||||
| def test_cancel_ctx_with_parent_side_entered_in_bg_task( |  | ||||||
|     debug_mode: bool, |  | ||||||
|     loglevel: str, |  | ||||||
|     cancel_ctx: bool, |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     The most "basic" out-of-band-task self-cancellation case where |  | ||||||
|     `Portal.open_context()` is entered in a bg task and the |  | ||||||
|     parent-task (of the containing nursery) calls `Context.cancel()` |  | ||||||
|     without the child knowing; the `Context._scope` should be |  | ||||||
|     `.cancel_called` when the IPC ctx's child-side relays |  | ||||||
|     a `ContextCancelled` with a `.canceller` set to the parent |  | ||||||
|     actor('s task). |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     async def main(): |  | ||||||
|         with trio.fail_after( |  | ||||||
|             2 if not debug_mode else 999, |  | ||||||
|         ): |  | ||||||
|             an: ActorNursery |  | ||||||
|             async with ( |  | ||||||
|                 tractor.open_nursery( |  | ||||||
|                     debug_mode=debug_mode, |  | ||||||
|                     loglevel='devx', |  | ||||||
|                     enable_stack_on_sig=True, |  | ||||||
|                 ) as an, |  | ||||||
|                 trio.open_nursery() as tn, |  | ||||||
|             ): |  | ||||||
|                 ptl: Portal = await an.start_actor( |  | ||||||
|                     'sub', |  | ||||||
|                     enable_modules=[__name__], |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|                 async def _open_ctx_async( |  | ||||||
|                     do_started: bool = True, |  | ||||||
|                     task_status=trio.TASK_STATUS_IGNORED, |  | ||||||
|                 ): |  | ||||||
|                     # do we expect to never enter the |  | ||||||
|                     # `.open_context()` below. |  | ||||||
|                     if not do_started: |  | ||||||
|                         task_status.started() |  | ||||||
| 
 |  | ||||||
|                     async with ptl.open_context( |  | ||||||
|                         sleep_forever, |  | ||||||
|                         do_started=do_started, |  | ||||||
|                     ) as (ctx, first): |  | ||||||
|                         task_status.started(ctx) |  | ||||||
|                         await trio.sleep_forever() |  | ||||||
| 
 |  | ||||||
|                 # XXX, this is the key OoB part! |  | ||||||
|                 # |  | ||||||
|                 # - start the `.open_context()` in a bg task which |  | ||||||
|                 #   blocks inside the embedded scope-body, |  | ||||||
|                 # |  | ||||||
|                 # -  when we call `Context.cancel()` it **is |  | ||||||
|                 #   not** from the same task which eventually runs |  | ||||||
|                 #   `.__aexit__()`, |  | ||||||
|                 # |  | ||||||
|                 # - since the bg "opener" task will be in |  | ||||||
|                 #   a `trio.sleep_forever()`, it must be interrupted |  | ||||||
|                 #   by the `ContextCancelled` delivered from the |  | ||||||
|                 #   child-side; `Context._scope: CancelScope` MUST |  | ||||||
|                 #   be `.cancel_called`! |  | ||||||
|                 # |  | ||||||
|                 print('ASYNC opening IPC context in subtask..') |  | ||||||
|                 maybe_ctx: Context|None = await tn.start(partial( |  | ||||||
|                     _open_ctx_async, |  | ||||||
|                 )) |  | ||||||
| 
 |  | ||||||
|                 if ( |  | ||||||
|                     maybe_ctx |  | ||||||
|                     and |  | ||||||
|                     cancel_ctx |  | ||||||
|                 ): |  | ||||||
|                     print('cancelling first IPC ctx!') |  | ||||||
|                     await maybe_ctx.cancel() |  | ||||||
| 
 |  | ||||||
|                 # XXX, note that despite `maybe_context.cancel()` |  | ||||||
|                 # being called above, it's the parent (bg) task |  | ||||||
|                 # which was originally never interrupted in |  | ||||||
|                 # the `ctx._scope` body due to missing case logic in |  | ||||||
|                 # `ctx._maybe_cancel_and_set_remote_error()`. |  | ||||||
|                 # |  | ||||||
|                 # It didn't matter that the subactor process was |  | ||||||
|                 # already terminated and reaped, nothing was |  | ||||||
|                 # cancelling the ctx-parent task's scope! |  | ||||||
|                 # |  | ||||||
|                 print('cancelling subactor!') |  | ||||||
|                 await ptl.cancel_actor() |  | ||||||
| 
 |  | ||||||
|                 if maybe_ctx: |  | ||||||
|                     try: |  | ||||||
|                         await maybe_ctx.wait_for_result() |  | ||||||
|                     except tractor.ContextCancelled as ctxc: |  | ||||||
|                         assert not cancel_ctx |  | ||||||
|                         assert ( |  | ||||||
|                             ctxc.canceller |  | ||||||
|                             == |  | ||||||
|                             tractor.current_actor().aid.uid |  | ||||||
|                         ) |  | ||||||
|                         # don't re-raise since it'll trigger |  | ||||||
|                         # an EG from the above tn. |  | ||||||
| 
 |  | ||||||
|     if cancel_ctx: |  | ||||||
|         # graceful self-cancel |  | ||||||
|         trio.run(main) |  | ||||||
| 
 |  | ||||||
|     else: |  | ||||||
|         # ctx parent task should see OoB ctxc due to |  | ||||||
|         # `ptl.cancel_actor()`. |  | ||||||
|         with pytest.raises(tractor.ContextCancelled) as excinfo: |  | ||||||
|             trio.run(main) |  | ||||||
| 
 |  | ||||||
|         assert 'root' in excinfo.value.canceller[0] |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it( |  | ||||||
| #     debug_mode: bool, |  | ||||||
| #     loglevel: str, |  | ||||||
| # ): |  | ||||||
| #     ''' |  | ||||||
| #     Demos OoB cancellation from the perspective of a ctx opened with |  | ||||||
| #     a child subactor where the parent cancels the child at the "actor |  | ||||||
| #     layer" using `Portal.cancel_actor()` and thus the |  | ||||||
| #     `ContextCancelled.canceller` received by the ctx's parent-side |  | ||||||
| #     task will appear to be a "self cancellation" even though that |  | ||||||
| #     specific task itself was not cancelled and thus |  | ||||||
| #     `Context.cancel_called ==False`. |  | ||||||
| #     ''' |  | ||||||
|                 # TODO, do we have an existing implied ctx |  | ||||||
|                 # cancel test like this? |  | ||||||
|                 # with trio.move_on_after(0.5):# as cs: |  | ||||||
|                 #     await _open_ctx_async( |  | ||||||
|                 #         do_started=False, |  | ||||||
|                 #     ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
|                 # in-line ctx scope should definitely raise |  | ||||||
|                 # a ctxc with `.canceller = 'root'` |  | ||||||
|                 # async with ptl.open_context( |  | ||||||
|                 #     sleep_forever, |  | ||||||
|                 #     do_started=True, |  | ||||||
|                 # ) as pair: |  | ||||||
| 
 |  | ||||||
|  | @ -6,18 +6,11 @@ want to see changed. | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
| ) | ) | ||||||
| from types import ModuleType |  | ||||||
| 
 |  | ||||||
| from functools import partial |  | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| from _pytest import pathlib |  | ||||||
| from tractor.trionics import collapse_eg | from tractor.trionics import collapse_eg | ||||||
| import trio | import trio | ||||||
| from trio import TaskStatus | from trio import TaskStatus | ||||||
| from tractor._testing import ( |  | ||||||
|     examples_dir, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||||
|  | @ -113,9 +106,8 @@ def test_acm_embedded_nursery_propagates_enter_err( | ||||||
|     debug_mode: bool, |     debug_mode: bool, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     Demo how a masking `trio.Cancelled` could be handled by unmasking |     Demo how a masking `trio.Cancelled` could be handled by unmasking from the | ||||||
|     from the `.__context__` field when a user (by accident) re-raises |     `.__context__` field when a user (by accident) re-raises from a `finally:`. | ||||||
|     from a `finally:`. |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     import tractor |     import tractor | ||||||
|  | @ -125,9 +117,11 @@ def test_acm_embedded_nursery_propagates_enter_err( | ||||||
|         async with ( |         async with ( | ||||||
|             trio.open_nursery() as tn, |             trio.open_nursery() as tn, | ||||||
|             tractor.trionics.maybe_raise_from_masking_exc( |             tractor.trionics.maybe_raise_from_masking_exc( | ||||||
|  |                 tn=tn, | ||||||
|                 unmask_from=( |                 unmask_from=( | ||||||
|                     (trio.Cancelled,) if unmask_from_canc |                     trio.Cancelled | ||||||
|                     else () |                     if unmask_from_canc | ||||||
|  |                     else None | ||||||
|                 ), |                 ), | ||||||
|             ) |             ) | ||||||
|         ): |         ): | ||||||
|  | @ -142,7 +136,8 @@ def test_acm_embedded_nursery_propagates_enter_err( | ||||||
|         with tractor.devx.maybe_open_crash_handler( |         with tractor.devx.maybe_open_crash_handler( | ||||||
|             pdb=debug_mode, |             pdb=debug_mode, | ||||||
|         ) as bxerr: |         ) as bxerr: | ||||||
|             assert not bxerr.value |             if bxerr: | ||||||
|  |                 assert not bxerr.value | ||||||
| 
 | 
 | ||||||
|             async with ( |             async with ( | ||||||
|                 wraps_tn_that_always_cancels() as tn, |                 wraps_tn_that_always_cancels() as tn, | ||||||
|  | @ -150,12 +145,11 @@ def test_acm_embedded_nursery_propagates_enter_err( | ||||||
|                 assert not tn.cancel_scope.cancel_called |                 assert not tn.cancel_scope.cancel_called | ||||||
|                 assert 0 |                 assert 0 | ||||||
| 
 | 
 | ||||||
|         if debug_mode: |         assert ( | ||||||
|             assert ( |             (err := bxerr.value) | ||||||
|                 (err := bxerr.value) |             and | ||||||
|                 and |             type(err) is AssertionError | ||||||
|                 type(err) is AssertionError |         ) | ||||||
|             ) |  | ||||||
| 
 | 
 | ||||||
|     with pytest.raises(ExceptionGroup) as excinfo: |     with pytest.raises(ExceptionGroup) as excinfo: | ||||||
|         trio.run(_main) |         trio.run(_main) | ||||||
|  | @ -166,13 +160,13 @@ def test_acm_embedded_nursery_propagates_enter_err( | ||||||
|     assert len(assert_eg.exceptions) == 1 |     assert len(assert_eg.exceptions) == 1 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def test_gatherctxs_with_memchan_breaks_multicancelled( | def test_gatherctxs_with_memchan_breaks_multicancelled( | ||||||
|     debug_mode: bool, |     debug_mode: bool, | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|     Demo how a using an `async with sndchan` inside |     Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task | ||||||
|     a `.trionics.gather_contexts()` task will break a strict-eg-tn's |     will break a strict-eg-tn's multi-cancelled absorption.. | ||||||
|     multi-cancelled absorption.. |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     from tractor import ( |     from tractor import ( | ||||||
|  | @ -198,6 +192,7 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( | ||||||
|                 f'Closed {task!r}\n' |                 f'Closed {task!r}\n' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|     async def main(): |     async def main(): | ||||||
|         async with ( |         async with ( | ||||||
|             # XXX should ensure ONLY the KBI |             # XXX should ensure ONLY the KBI | ||||||
|  | @ -218,85 +213,3 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( | ||||||
| 
 | 
 | ||||||
|     with pytest.raises(KeyboardInterrupt): |     with pytest.raises(KeyboardInterrupt): | ||||||
|         trio.run(main) |         trio.run(main) | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'raise_unmasked', [ |  | ||||||
|         True, |  | ||||||
|         pytest.param( |  | ||||||
|             False, |  | ||||||
|             marks=pytest.mark.xfail( |  | ||||||
|                 reason="see examples/trio/send_chan_aclose_masks.py" |  | ||||||
|             ) |  | ||||||
|         ), |  | ||||||
|     ] |  | ||||||
| ) |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'child_errors_mid_stream', |  | ||||||
|     [True, False], |  | ||||||
| ) |  | ||||||
| def test_unmask_aclose_as_checkpoint_on_aexit( |  | ||||||
|     raise_unmasked: bool, |  | ||||||
|     child_errors_mid_stream: bool, |  | ||||||
|     debug_mode: bool, |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     Verify that our unmasker util works over the common case where |  | ||||||
|     a mem-chan's `.aclose()` is included in an `@acm` stack |  | ||||||
|     and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded |  | ||||||
|     exception from user code resulting in a silent failure which |  | ||||||
|     appears like graceful cancellation. |  | ||||||
| 
 |  | ||||||
|     This test suite is mostly implemented as an example script so it |  | ||||||
|     could more easily be shared with `trio`-core peeps as `tractor`-less |  | ||||||
|     minimum reproducing example. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     mod: ModuleType = pathlib.import_path( |  | ||||||
|         examples_dir() |  | ||||||
|         / 'trio' |  | ||||||
|         / 'send_chan_aclose_masks_beg.py', |  | ||||||
|         root=examples_dir(), |  | ||||||
|         consider_namespace_packages=False, |  | ||||||
|     ) |  | ||||||
|     with pytest.raises(RuntimeError): |  | ||||||
|         trio.run(partial( |  | ||||||
|             mod.main, |  | ||||||
|             raise_unmasked=raise_unmasked, |  | ||||||
|             child_errors_mid_stream=child_errors_mid_stream, |  | ||||||
|         )) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'ignore_special_cases', [ |  | ||||||
|         True, |  | ||||||
|         pytest.param( |  | ||||||
|             False, |  | ||||||
|             marks=pytest.mark.xfail( |  | ||||||
|                 reason="see examples/trio/lockacquire_not_umasked.py" |  | ||||||
|             ) |  | ||||||
|         ), |  | ||||||
|     ] |  | ||||||
| ) |  | ||||||
| def test_cancelled_lockacquire_in_ipctx_not_unmasked( |  | ||||||
|     ignore_special_cases: bool, |  | ||||||
|     loglevel: str, |  | ||||||
|     debug_mode: bool, |  | ||||||
| ): |  | ||||||
|     mod: ModuleType = pathlib.import_path( |  | ||||||
|         examples_dir() |  | ||||||
|         / 'trio' |  | ||||||
|         / 'lockacquire_not_unmasked.py', |  | ||||||
|         root=examples_dir(), |  | ||||||
|         consider_namespace_packages=False, |  | ||||||
|     ) |  | ||||||
|     async def _main(): |  | ||||||
|         with trio.fail_after(2): |  | ||||||
|             await mod.main( |  | ||||||
|                 ignore_special_cases=ignore_special_cases, |  | ||||||
|                 loglevel=loglevel, |  | ||||||
|                 debug_mode=debug_mode, |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|     trio.run(_main) |  | ||||||
|  |  | ||||||
|  | @ -442,25 +442,25 @@ class Context: | ||||||
|         ''' |         ''' | ||||||
|         Records whether cancellation has been requested for this context |         Records whether cancellation has been requested for this context | ||||||
|         by a call to  `.cancel()` either due to, |         by a call to  `.cancel()` either due to, | ||||||
|         - an explicit call by some local task, |         - either an explicit call by some local task, | ||||||
|         - or an implicit call due to an error caught inside |         - or an implicit call due to an error caught inside | ||||||
|           the `Portal.open_context()` block. |           the ``Portal.open_context()`` block. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         return self._cancel_called |         return self._cancel_called | ||||||
| 
 | 
 | ||||||
|     # XXX, to debug who frickin sets it.. |     @cancel_called.setter | ||||||
|     # @cancel_called.setter |     def cancel_called(self, val: bool) -> None: | ||||||
|     # def cancel_called(self, val: bool) -> None: |         ''' | ||||||
|     #     ''' |         Set the self-cancelled request `bool` value. | ||||||
|     #     Set the self-cancelled request `bool` value. |  | ||||||
| 
 | 
 | ||||||
|     #     ''' |         ''' | ||||||
|     #     if val: |         # to debug who frickin sets it.. | ||||||
|     #         from .devx import pause_from_sync |         # if val: | ||||||
|     #         pause_from_sync() |         #     from .devx import pause_from_sync | ||||||
|  |         #     pause_from_sync() | ||||||
| 
 | 
 | ||||||
|     #     self._cancel_called = val |         self._cancel_called = val | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def canceller(self) -> tuple[str, str]|None: |     def canceller(self) -> tuple[str, str]|None: | ||||||
|  | @ -635,71 +635,6 @@ class Context: | ||||||
|         ''' |         ''' | ||||||
|         await self.chan.send(Stop(cid=self.cid)) |         await self.chan.send(Stop(cid=self.cid)) | ||||||
| 
 | 
 | ||||||
|     @property |  | ||||||
|     def parent_task(self) -> trio.Task: |  | ||||||
|         ''' |  | ||||||
|         This IPC context's "owning task" which is a `trio.Task` |  | ||||||
|         on one of the "sides" of the IPC. |  | ||||||
| 
 |  | ||||||
|         Note that the "parent_" prefix here refers to the local |  | ||||||
|         `trio` task tree using the same interface as |  | ||||||
|         `trio.Nursery.parent_task` whereas for IPC contexts, |  | ||||||
|         a different cross-actor task hierarchy exists: |  | ||||||
| 
 |  | ||||||
|         - a "parent"-side which originally entered |  | ||||||
|           `Portal.open_context()`, |  | ||||||
| 
 |  | ||||||
|         - the "child"-side which was spawned and scheduled to invoke |  | ||||||
|           a function decorated with `@tractor.context`. |  | ||||||
| 
 |  | ||||||
|         This task is thus a handle to mem-domain-distinct/per-process |  | ||||||
|         `Nursery.parent_task` depending on in which of the above |  | ||||||
|         "sides" this context exists. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         return self._task |  | ||||||
| 
 |  | ||||||
|     def _is_blocked_on_rx_chan(self) -> bool: |  | ||||||
|         ''' |  | ||||||
|         Predicate to indicate whether the owner `._task: trio.Task` is |  | ||||||
|         currently blocked (by `.receive()`-ing) on its underlying RPC |  | ||||||
|         feeder `._rx_chan`. |  | ||||||
| 
 |  | ||||||
|         This knowledge is highly useful when handling so called |  | ||||||
|         "out-of-band" (OoB) cancellation conditions where a peer |  | ||||||
|         actor's task transmitted some remote error/cancel-msg and we |  | ||||||
|         must know whether to signal-via-cancel currently executing |  | ||||||
|         "user-code" (user defined code embedded in `ctx._scope`) or |  | ||||||
|         simply to forward the IPC-msg-as-error **without calling** |  | ||||||
|         `._scope.cancel()`. |  | ||||||
| 
 |  | ||||||
|         In the latter case it is presumed that if the owner task is |  | ||||||
|         blocking for the next IPC msg, it will eventually receive, |  | ||||||
|         process and raise the equivalent local error **without** |  | ||||||
|         requiring `._scope.cancel()` to be explicitly called by the |  | ||||||
|         *delivering OoB RPC-task* (via `_deliver_msg()`). |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         # NOTE, see the mem-chan meth-impls for *why* this |  | ||||||
|         # logic works, |  | ||||||
|         # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` |  | ||||||
|         # |  | ||||||
|         # XXX realize that this is NOT an |  | ||||||
|         # official/will-be-loudly-deprecated API: |  | ||||||
|         # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data |  | ||||||
|         #  |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled |  | ||||||
|         # |  | ||||||
|         # orig repo intro in the mem-chan change over patch: |  | ||||||
|         # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 |  | ||||||
|         #  |_https://github.com/python-trio/trio/pull/616 |  | ||||||
|         #  |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 |  | ||||||
|         # |  | ||||||
|         return ( |  | ||||||
|             self._task.custom_sleep_data |  | ||||||
|             is |  | ||||||
|             self._rx_chan |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     def _maybe_cancel_and_set_remote_error( |     def _maybe_cancel_and_set_remote_error( | ||||||
|         self, |         self, | ||||||
|         error: BaseException, |         error: BaseException, | ||||||
|  | @ -852,27 +787,13 @@ class Context: | ||||||
|         if self._canceller is None: |         if self._canceller is None: | ||||||
|             log.error('Ctx has no canceller set!?') |             log.error('Ctx has no canceller set!?') | ||||||
| 
 | 
 | ||||||
|         cs: trio.CancelScope = self._scope |  | ||||||
| 
 |  | ||||||
|         # ?TODO? see comment @ .start_remote_task()` |  | ||||||
|         # |  | ||||||
|         # if not cs: |  | ||||||
|         #     from .devx import mk_pdb |  | ||||||
|         #     mk_pdb().set_trace() |  | ||||||
|         #     raise RuntimeError( |  | ||||||
|         #         f'IPC ctx was not be opened prior to remote error delivery !?\n' |  | ||||||
|         #         f'{self}\n' |  | ||||||
|         #         f'\n' |  | ||||||
|         #         f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' |  | ||||||
|         #     ) |  | ||||||
| 
 |  | ||||||
|         # Cancel the local `._scope`, catch that |         # Cancel the local `._scope`, catch that | ||||||
|         # `._scope.cancelled_caught` and re-raise any remote error |         # `._scope.cancelled_caught` and re-raise any remote error | ||||||
|         # once exiting (or manually calling `.wait_for_result()`) the |         # once exiting (or manually calling `.wait_for_result()`) the | ||||||
|         # `.open_context()`  block. |         # `.open_context()`  block. | ||||||
|  |         cs: trio.CancelScope = self._scope | ||||||
|         if ( |         if ( | ||||||
|             cs |             cs | ||||||
|             and not cs.cancel_called |  | ||||||
| 
 | 
 | ||||||
|             # XXX this is an expected cancel request response |             # XXX this is an expected cancel request response | ||||||
|             # message and we **don't need to raise it** in the |             # message and we **don't need to raise it** in the | ||||||
|  | @ -881,7 +802,8 @@ class Context: | ||||||
|             # if `._cancel_called` then `.cancel_acked and .cancel_called` |             # if `._cancel_called` then `.cancel_acked and .cancel_called` | ||||||
|             # always should be set. |             # always should be set. | ||||||
|             and not self._is_self_cancelled() |             and not self._is_self_cancelled() | ||||||
|             # and not cs.cancelled_caught |             and not cs.cancel_called | ||||||
|  |             and not cs.cancelled_caught | ||||||
|         ): |         ): | ||||||
|             if ( |             if ( | ||||||
|                 msgerr |                 msgerr | ||||||
|  | @ -892,7 +814,7 @@ class Context: | ||||||
|                 not self._cancel_on_msgerr |                 not self._cancel_on_msgerr | ||||||
|             ): |             ): | ||||||
|                 message: str = ( |                 message: str = ( | ||||||
|                     f'NOT Cancelling `Context._scope` since,\n' |                     'NOT Cancelling `Context._scope` since,\n' | ||||||
|                     f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' |                     f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' | ||||||
|                     f'AND we got a msg-type-error!\n' |                     f'AND we got a msg-type-error!\n' | ||||||
|                     f'{error}\n' |                     f'{error}\n' | ||||||
|  | @ -902,43 +824,13 @@ class Context: | ||||||
|                 # `trio.Cancelled` subtype here ;) |                 # `trio.Cancelled` subtype here ;) | ||||||
|                 # https://github.com/goodboy/tractor/issues/368 |                 # https://github.com/goodboy/tractor/issues/368 | ||||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' |                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||||
|                 cs.cancel() |                 # from .devx import pause_from_sync | ||||||
| 
 |                 # pause_from_sync() | ||||||
|         # TODO, explicit condition for OoB (self-)cancellation? |                 self._scope.cancel() | ||||||
|         # - we called `Portal.cancel_actor()` from this actor |         else: | ||||||
|         #   and the peer ctx task delivered ctxc due to it. |             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||||
|         # - currently `self._is_self_cancelled()` will be true |  | ||||||
|         #   since the ctxc.canceller check will match us even though it |  | ||||||
|         #   wasn't from this ctx specifically! |  | ||||||
|         elif ( |  | ||||||
|             cs |  | ||||||
|             and self._is_self_cancelled() |  | ||||||
|             and not cs.cancel_called |  | ||||||
|         ): |  | ||||||
|             message: str = ( |  | ||||||
|                 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' |  | ||||||
|                 '\n' |  | ||||||
|             ) |  | ||||||
|             # from .devx import mk_pdb |             # from .devx import mk_pdb | ||||||
|             # mk_pdb().set_trace() |             # mk_pdb().set_trace() | ||||||
|             # TODO XXX, required to fix timeout failure in |  | ||||||
|             # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` |  | ||||||
|             # |  | ||||||
| 
 |  | ||||||
|             # XXX NOTE XXX, this is SUPER SUBTLE! |  | ||||||
|             # we only want to cancel our embedded `._scope` |  | ||||||
|             # if the ctx's current/using task is NOT blocked |  | ||||||
|             # on `._rx_chan.receive()` and on some other |  | ||||||
|             # `trio`-checkpoint since in the former case |  | ||||||
|             # any `._remote_error` will be relayed through |  | ||||||
|             # the rx-chan and appropriately raised by the owning |  | ||||||
|             # `._task` directly. IF the owner task is however |  | ||||||
|             # blocking elsewhere we need to interrupt it **now**. |  | ||||||
|             if not self._is_blocked_on_rx_chan(): |  | ||||||
|                 cs.cancel() |  | ||||||
|         else: |  | ||||||
|             # rx_stats = self._rx_chan.statistics() |  | ||||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' |  | ||||||
| 
 | 
 | ||||||
|         fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' |         fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' | ||||||
|         if ( |         if ( | ||||||
|  | @ -962,7 +854,6 @@ class Context: | ||||||
|                 + |                 + | ||||||
|                 cs_fmt |                 cs_fmt | ||||||
|             ) |             ) | ||||||
| 
 |  | ||||||
|         log.cancel( |         log.cancel( | ||||||
|             message |             message | ||||||
|             + |             + | ||||||
|  | @ -1055,9 +946,8 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         side: str = self.side |         side: str = self.side | ||||||
|         self._cancel_called = True |         # XXX for debug via the `@.setter` | ||||||
|         # ^ XXX for debug via the `@.setter` |         self.cancel_called = True | ||||||
|         # self.cancel_called = True |  | ||||||
| 
 | 
 | ||||||
|         header: str = ( |         header: str = ( | ||||||
|             f'Cancelling ctx from {side!r}-side\n' |             f'Cancelling ctx from {side!r}-side\n' | ||||||
|  | @ -2121,9 +2011,6 @@ async def open_context_from_portal( | ||||||
|             f'|_{portal.actor}\n' |             f'|_{portal.actor}\n' | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     # ?TODO? could we move this to inside the `tn` block? |  | ||||||
|     # -> would allow doing `ctx.parent_task = tn.parent_task` ? |  | ||||||
|     # -> would allow a `if not ._scope: => raise RTE` ? |  | ||||||
|     ctx: Context = await portal.actor.start_remote_task( |     ctx: Context = await portal.actor.start_remote_task( | ||||||
|         portal.channel, |         portal.channel, | ||||||
|         nsf=nsf, |         nsf=nsf, | ||||||
|  | @ -2150,7 +2037,6 @@ async def open_context_from_portal( | ||||||
|     scope_err: BaseException|None = None |     scope_err: BaseException|None = None | ||||||
|     ctxc_from_child: ContextCancelled|None = None |     ctxc_from_child: ContextCancelled|None = None | ||||||
|     try: |     try: | ||||||
|         # from .devx import pause |  | ||||||
|         async with ( |         async with ( | ||||||
|             collapse_eg(), |             collapse_eg(), | ||||||
|             trio.open_nursery() as tn, |             trio.open_nursery() as tn, | ||||||
|  | @ -2173,10 +2059,6 @@ async def open_context_from_portal( | ||||||
|             # the dialog, the `Error` msg should be raised from the `msg` |             # the dialog, the `Error` msg should be raised from the `msg` | ||||||
|             # handling block below. |             # handling block below. | ||||||
|             try: |             try: | ||||||
|                 log.runtime( |  | ||||||
|                     f'IPC ctx parent waiting on Started msg..\n' |  | ||||||
|                     f'ctx.cid: {ctx.cid!r}\n' |  | ||||||
|                 ) |  | ||||||
|                 started_msg, first = await ctx._pld_rx.recv_msg( |                 started_msg, first = await ctx._pld_rx.recv_msg( | ||||||
|                     ipc=ctx, |                     ipc=ctx, | ||||||
|                     expect_msg=Started, |                     expect_msg=Started, | ||||||
|  | @ -2185,16 +2067,16 @@ async def open_context_from_portal( | ||||||
|                 ) |                 ) | ||||||
|             except trio.Cancelled as taskc: |             except trio.Cancelled as taskc: | ||||||
|                 ctx_cs: trio.CancelScope = ctx._scope |                 ctx_cs: trio.CancelScope = ctx._scope | ||||||
|                 log.cancel( |  | ||||||
|                     f'IPC ctx was cancelled during "child" task sync due to\n\n' |  | ||||||
|                     f'.cid: {ctx.cid!r}\n' |  | ||||||
|                     f'.maybe_error: {ctx.maybe_error!r}\n' |  | ||||||
|                 ) |  | ||||||
|                 # await pause(shield=True) |  | ||||||
| 
 |  | ||||||
|                 if not ctx_cs.cancel_called: |                 if not ctx_cs.cancel_called: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|  |                 # from .devx import pause | ||||||
|  |                 # await pause(shield=True) | ||||||
|  | 
 | ||||||
|  |                 log.cancel( | ||||||
|  |                     'IPC ctx was cancelled during "child" task sync due to\n\n' | ||||||
|  |                     f'{ctx.maybe_error}\n' | ||||||
|  |                 ) | ||||||
|                 # OW if the ctx's scope was cancelled manually, |                 # OW if the ctx's scope was cancelled manually, | ||||||
|                 # likely the `Context` was cancelled via a call to |                 # likely the `Context` was cancelled via a call to | ||||||
|                 # `._maybe_cancel_and_set_remote_error()` so ensure |                 # `._maybe_cancel_and_set_remote_error()` so ensure | ||||||
|  | @ -2390,16 +2272,13 @@ async def open_context_from_portal( | ||||||
|         match scope_err: |         match scope_err: | ||||||
|             case trio.Cancelled(): |             case trio.Cancelled(): | ||||||
|                 logmeth = log.cancel |                 logmeth = log.cancel | ||||||
|                 cause: str = 'cancelled' |  | ||||||
| 
 | 
 | ||||||
|             # XXX explicitly report on any non-graceful-taskc cases |             # XXX explicitly report on any non-graceful-taskc cases | ||||||
|             case _: |             case _: | ||||||
|                 cause: str = 'errored' |  | ||||||
|                 logmeth = log.exception |                 logmeth = log.exception | ||||||
| 
 | 
 | ||||||
|         logmeth( |         logmeth( | ||||||
|             f'ctx {ctx.side!r}-side {cause!r} with,\n' |             f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' | ||||||
|             f'{ctx.repr_outcome()!r}\n' |  | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         if debug_mode(): |         if debug_mode(): | ||||||
|  | @ -2424,7 +2303,6 @@ async def open_context_from_portal( | ||||||
|         # told us it's cancelled ;p |         # told us it's cancelled ;p | ||||||
|         if ctxc_from_child is None: |         if ctxc_from_child is None: | ||||||
|             try: |             try: | ||||||
|                 # await pause(shield=True) |  | ||||||
|                 await ctx.cancel() |                 await ctx.cancel() | ||||||
|             except ( |             except ( | ||||||
|                 trio.BrokenResourceError, |                 trio.BrokenResourceError, | ||||||
|  | @ -2581,10 +2459,8 @@ async def open_context_from_portal( | ||||||
|                 log.cancel( |                 log.cancel( | ||||||
|                     f'Context cancelled by local {ctx.side!r}-side task\n' |                     f'Context cancelled by local {ctx.side!r}-side task\n' | ||||||
|                     f'c)>\n' |                     f'c)>\n' | ||||||
|                     f'  |_{ctx.parent_task}\n' |                     f' |_{ctx._task}\n\n' | ||||||
|                     f'   .cid={ctx.cid!r}\n' |                     f'{repr(scope_err)}\n' | ||||||
|                     f'\n' |  | ||||||
|                     f'{scope_err!r}\n' |  | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             # TODO: should we add a `._cancel_req_received` |             # TODO: should we add a `._cancel_req_received` | ||||||
|  |  | ||||||
|  | @ -654,7 +654,8 @@ async def _invoke( | ||||||
|                 # scope ensures unasking of the `await coro` below |                 # scope ensures unasking of the `await coro` below | ||||||
|                 # *should* never be interfered with!! |                 # *should* never be interfered with!! | ||||||
|                 maybe_raise_from_masking_exc( |                 maybe_raise_from_masking_exc( | ||||||
|                     unmask_from=(Cancelled,), |                     tn=tn, | ||||||
|  |                     unmask_from=Cancelled, | ||||||
|                 ) as _mbme,  # maybe boxed masked exc |                 ) as _mbme,  # maybe boxed masked exc | ||||||
|             ): |             ): | ||||||
|                 ctx._scope_nursery = tn |                 ctx._scope_nursery = tn | ||||||
|  |  | ||||||
|  | @ -446,12 +446,12 @@ class ActorNursery: | ||||||
| @acm | @acm | ||||||
| async def _open_and_supervise_one_cancels_all_nursery( | async def _open_and_supervise_one_cancels_all_nursery( | ||||||
|     actor: Actor, |     actor: Actor, | ||||||
|     hide_tb: bool = True, |     tb_hide: bool = False, | ||||||
| 
 | 
 | ||||||
| ) -> typing.AsyncGenerator[ActorNursery, None]: | ) -> typing.AsyncGenerator[ActorNursery, None]: | ||||||
| 
 | 
 | ||||||
|     # normally don't need to show user by default |     # normally don't need to show user by default | ||||||
|     __tracebackhide__: bool = hide_tb |     __tracebackhide__: bool = tb_hide | ||||||
| 
 | 
 | ||||||
|     outer_err: BaseException|None = None |     outer_err: BaseException|None = None | ||||||
|     inner_err: BaseException|None = None |     inner_err: BaseException|None = None | ||||||
|  |  | ||||||
|  | @ -613,9 +613,10 @@ async def drain_to_final_msg( | ||||||
|             #       msg: dict = await ctx._rx_chan.receive() |             #       msg: dict = await ctx._rx_chan.receive() | ||||||
|             #   if res_cs.cancelled_caught: |             #   if res_cs.cancelled_caught: | ||||||
|             # |             # | ||||||
|             # -[x] make sure pause points work here for REPLing |             # -[ ] make sure pause points work here for REPLing | ||||||
|             #   the runtime itself; i.e. ensure there's no hangs! |             #   the runtime itself; i.e. ensure there's no hangs! | ||||||
|             #  |_see masked code below in .cancel_called path |             # |_from tractor.devx.debug import pause | ||||||
|  |             #   await pause() | ||||||
| 
 | 
 | ||||||
|         # NOTE: we get here if the far end was |         # NOTE: we get here if the far end was | ||||||
|         # `ContextCancelled` in 2 cases: |         # `ContextCancelled` in 2 cases: | ||||||
|  | @ -651,10 +652,6 @@ async def drain_to_final_msg( | ||||||
|                     f'IPC ctx cancelled externally during result drain ?\n' |                     f'IPC ctx cancelled externally during result drain ?\n' | ||||||
|                     f'{ctx}' |                     f'{ctx}' | ||||||
|                 ) |                 ) | ||||||
|                 # XXX, for tracing `Cancelled`.. |  | ||||||
|                 # from tractor.devx.debug import pause |  | ||||||
|                 # await pause(shield=True) |  | ||||||
| 
 |  | ||||||
|             # CASE 2: mask the local cancelled-error(s) |             # CASE 2: mask the local cancelled-error(s) | ||||||
|             # only when we are sure the remote error is |             # only when we are sure the remote error is | ||||||
|             # the source cause of this local task's |             # the source cause of this local task's | ||||||
|  |  | ||||||
|  | @ -30,6 +30,7 @@ from typing import ( | ||||||
|     AsyncIterator, |     AsyncIterator, | ||||||
|     Callable, |     Callable, | ||||||
|     Hashable, |     Hashable, | ||||||
|  |     Optional, | ||||||
|     Sequence, |     Sequence, | ||||||
|     TypeVar, |     TypeVar, | ||||||
| ) | ) | ||||||
|  | @ -175,7 +176,7 @@ class _Cache: | ||||||
|     a kept-alive-while-in-use async resource. |     a kept-alive-while-in-use async resource. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     service_tn: trio.Nursery|None = None |     service_tn: Optional[trio.Nursery] = None | ||||||
|     locks: dict[Hashable, trio.Lock] = {} |     locks: dict[Hashable, trio.Lock] = {} | ||||||
|     users: int = 0 |     users: int = 0 | ||||||
|     values: dict[Any,  Any] = {} |     values: dict[Any,  Any] = {} | ||||||
|  | @ -184,7 +185,7 @@ class _Cache: | ||||||
|         tuple[trio.Nursery, trio.Event] |         tuple[trio.Nursery, trio.Event] | ||||||
|     ] = {} |     ] = {} | ||||||
|     # nurseries: dict[int, trio.Nursery] = {} |     # nurseries: dict[int, trio.Nursery] = {} | ||||||
|     no_more_users: trio.Event|None = None |     no_more_users: Optional[trio.Event] = None | ||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     async def run_ctx( |     async def run_ctx( | ||||||
|  | @ -194,18 +195,16 @@ class _Cache: | ||||||
|         task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, |         task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         try: |         async with mng as value: | ||||||
|             async with mng as value: |             _, no_more_users = cls.resources[ctx_key] | ||||||
|                 _, no_more_users = cls.resources[ctx_key] |             cls.values[ctx_key] = value | ||||||
|                 try: |             task_status.started(value) | ||||||
|                     cls.values[ctx_key] = value |             try: | ||||||
|                     task_status.started(value) |                 await no_more_users.wait() | ||||||
|                     await no_more_users.wait() |             finally: | ||||||
|                 finally: |                 # discard nursery ref so it won't be re-used (an error)? | ||||||
|                     value = cls.values.pop(ctx_key) |                 value = cls.values.pop(ctx_key) | ||||||
|         finally: |                 cls.resources.pop(ctx_key) | ||||||
|             # discard nursery ref so it won't be re-used (an error)? |  | ||||||
|             cls.resources.pop(ctx_key) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
|  |  | ||||||
|  | @ -22,14 +22,7 @@ from __future__ import annotations | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
| ) | ) | ||||||
| import inspect | from typing import TYPE_CHECKING | ||||||
| from types import ( |  | ||||||
|     TracebackType, |  | ||||||
| ) |  | ||||||
| from typing import ( |  | ||||||
|     Type, |  | ||||||
|     TYPE_CHECKING, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
|  | @ -67,71 +60,12 @@ def find_masked_excs( | ||||||
|     return None |     return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _mask_cases: dict[ |  | ||||||
|     Type[Exception],  # masked exc type |  | ||||||
|     dict[ |  | ||||||
|         int,  # inner-frame index into `inspect.getinnerframes()` |  | ||||||
|         # `FrameInfo.function/filename: str`s to match |  | ||||||
|         dict[str, str], |  | ||||||
|     ], |  | ||||||
| ] = { |  | ||||||
|     trio.WouldBlock: { |  | ||||||
|         # `trio.Lock.acquire()` has a checkpoint inside the |  | ||||||
|         # `WouldBlock`-no_wait path's handler.. |  | ||||||
|         -5: {  # "5th frame up" from checkpoint |  | ||||||
|             'filename': 'trio/_sync.py', |  | ||||||
|             'function': 'acquire', |  | ||||||
|             # 'lineno': 605,  # matters? |  | ||||||
|         }, |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def is_expected_masking_case( |  | ||||||
|     cases: dict, |  | ||||||
|     exc_ctx: Exception, |  | ||||||
|     exc_match: BaseException, |  | ||||||
| 
 |  | ||||||
| ) -> bool|inspect.FrameInfo: |  | ||||||
|     ''' |  | ||||||
|     Determine whether the provided masked exception is from a known |  | ||||||
|     bug/special/unintentional-`trio`-impl case which we do not wish |  | ||||||
|     to unmask. |  | ||||||
| 
 |  | ||||||
|     Return any guilty `inspect.FrameInfo` ow `False`. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     exc_tb: TracebackType = exc_match.__traceback__ |  | ||||||
|     if cases := _mask_cases.get(type(exc_ctx)): |  | ||||||
|         inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb) |  | ||||||
| 
 |  | ||||||
|         # from tractor.devx.debug import mk_pdb |  | ||||||
|         # mk_pdb().set_trace() |  | ||||||
|         for iframe, matchon in cases.items(): |  | ||||||
|             try: |  | ||||||
|                 masker_frame: inspect.FrameInfo = inner[iframe] |  | ||||||
|             except IndexError: |  | ||||||
|                 continue |  | ||||||
| 
 |  | ||||||
|             for field, in_field in matchon.items(): |  | ||||||
|                 val = getattr( |  | ||||||
|                     masker_frame, |  | ||||||
|                     field, |  | ||||||
|                 ) |  | ||||||
|                 if in_field not in val: |  | ||||||
|                     break |  | ||||||
|             else: |  | ||||||
|                 return masker_frame |  | ||||||
| 
 |  | ||||||
|     return False |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # XXX, relevant discussion @ `trio`-core, | # XXX, relevant discussion @ `trio`-core, | ||||||
| # https://github.com/python-trio/trio/issues/455 | # https://github.com/python-trio/trio/issues/455 | ||||||
| # | # | ||||||
| @acm | @acm | ||||||
| async def maybe_raise_from_masking_exc( | async def maybe_raise_from_masking_exc( | ||||||
|  |     tn: trio.Nursery|None = None, | ||||||
|     unmask_from: ( |     unmask_from: ( | ||||||
|         BaseException| |         BaseException| | ||||||
|         tuple[BaseException] |         tuple[BaseException] | ||||||
|  | @ -140,30 +74,18 @@ async def maybe_raise_from_masking_exc( | ||||||
|     raise_unmasked: bool = True, |     raise_unmasked: bool = True, | ||||||
|     extra_note: str = ( |     extra_note: str = ( | ||||||
|         'This can occurr when,\n' |         'This can occurr when,\n' | ||||||
|         '\n' |         ' - a `trio.Nursery` scope embeds a `finally:`-block ' | ||||||
|         ' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block ' |         'which executes a checkpoint!' | ||||||
|         'which execs an un-shielded checkpoint!' |  | ||||||
|         # |         # | ||||||
|         # ^TODO? other cases? |         # ^TODO? other cases? | ||||||
|     ), |     ), | ||||||
| 
 | 
 | ||||||
|     always_warn_on: tuple[Type[BaseException]] = ( |     always_warn_on: tuple[BaseException] = ( | ||||||
|         trio.Cancelled, |         trio.Cancelled, | ||||||
|     ), |     ), | ||||||
| 
 |  | ||||||
|     # don't ever unmask or warn on any masking pair, |  | ||||||
|     # {<masked-excT-key> -> <masking-excT-value>} |  | ||||||
|     never_warn_on: dict[ |  | ||||||
|         Type[BaseException], |  | ||||||
|         Type[BaseException], |  | ||||||
|     ] = { |  | ||||||
|         KeyboardInterrupt: trio.Cancelled, |  | ||||||
|         trio.Cancelled: trio.Cancelled, |  | ||||||
|     }, |  | ||||||
|     # ^XXX, special case(s) where we warn-log bc likely |     # ^XXX, special case(s) where we warn-log bc likely | ||||||
|     # there will be no operational diff since the exc |     # there will be no operational diff since the exc | ||||||
|     # is always expected to be consumed. |     # is always expected to be consumed. | ||||||
| 
 |  | ||||||
| ) -> BoxedMaybeException: | ) -> BoxedMaybeException: | ||||||
|     ''' |     ''' | ||||||
|     Maybe un-mask and re-raise exception(s) suppressed by a known |     Maybe un-mask and re-raise exception(s) suppressed by a known | ||||||
|  | @ -182,112 +104,81 @@ async def maybe_raise_from_masking_exc( | ||||||
|         individual sub-excs but maintain the eg-parent's form right? |         individual sub-excs but maintain the eg-parent's form right? | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     if not isinstance(unmask_from, tuple): |  | ||||||
|         raise ValueError( |  | ||||||
|             f'Invalid unmask_from = {unmask_from!r}\n' |  | ||||||
|             f'Must be a `tuple[Type[BaseException]]`.\n' |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     from tractor.devx.debug import ( |     from tractor.devx.debug import ( | ||||||
|         BoxedMaybeException, |         BoxedMaybeException, | ||||||
|  |         pause, | ||||||
|     ) |     ) | ||||||
|     boxed_maybe_exc = BoxedMaybeException( |     boxed_maybe_exc = BoxedMaybeException( | ||||||
|         raise_on_exit=raise_unmasked, |         raise_on_exit=raise_unmasked, | ||||||
|     ) |     ) | ||||||
|     matching: list[BaseException]|None = None |     matching: list[BaseException]|None = None | ||||||
|     try: |     maybe_eg: ExceptionGroup|None | ||||||
|         yield boxed_maybe_exc |  | ||||||
|         return |  | ||||||
|     except BaseException as _bexc: |  | ||||||
|         bexc = _bexc |  | ||||||
|         if isinstance(bexc, BaseExceptionGroup): |  | ||||||
|             matches: ExceptionGroup |  | ||||||
|             matches, _ = bexc.split(unmask_from) |  | ||||||
|             if matches: |  | ||||||
|                 matching = matches.exceptions |  | ||||||
| 
 | 
 | ||||||
|         elif ( |     if tn: | ||||||
|             unmask_from |         try:  # handle egs | ||||||
|             and |             yield boxed_maybe_exc | ||||||
|             type(bexc) in unmask_from |             return | ||||||
|         ): |         except* unmask_from as _maybe_eg: | ||||||
|             matching = [bexc] |             maybe_eg = _maybe_eg | ||||||
|  |             matches: ExceptionGroup | ||||||
|  |             matches, _ = maybe_eg.split( | ||||||
|  |                 unmask_from | ||||||
|  |             ) | ||||||
|  |             if not matches: | ||||||
|  |                 raise | ||||||
|  | 
 | ||||||
|  |             matching: list[BaseException] = matches.exceptions | ||||||
|  |     else: | ||||||
|  |         try:  # handle non-egs | ||||||
|  |             yield boxed_maybe_exc | ||||||
|  |             return | ||||||
|  |         except unmask_from as _maybe_exc: | ||||||
|  |             maybe_exc = _maybe_exc | ||||||
|  |             matching: list[BaseException] = [ | ||||||
|  |                 maybe_exc | ||||||
|  |             ] | ||||||
|  | 
 | ||||||
|  |         # XXX, only unmask-ed for debuggin! | ||||||
|  |         # TODO, remove eventually.. | ||||||
|  |         except BaseException as _berr: | ||||||
|  |             berr = _berr | ||||||
|  |             await pause(shield=True) | ||||||
|  |             raise berr | ||||||
| 
 | 
 | ||||||
|     if matching is None: |     if matching is None: | ||||||
|         raise |         raise | ||||||
| 
 | 
 | ||||||
|     masked: list[tuple[BaseException, BaseException]] = [] |     masked: list[tuple[BaseException, BaseException]] = [] | ||||||
|     for exc_match in matching: |     for exc_match in matching: | ||||||
|  | 
 | ||||||
|         if exc_ctx := find_masked_excs( |         if exc_ctx := find_masked_excs( | ||||||
|             maybe_masker=exc_match, |             maybe_masker=exc_match, | ||||||
|             unmask_from=set(unmask_from), |             unmask_from={unmask_from}, | ||||||
|         ): |         ): | ||||||
|             masked.append(( |             masked.append((exc_ctx, exc_match)) | ||||||
|                 exc_ctx, |  | ||||||
|                 exc_match, |  | ||||||
|             )) |  | ||||||
|             boxed_maybe_exc.value = exc_match |             boxed_maybe_exc.value = exc_match | ||||||
|             note: str = ( |             note: str = ( | ||||||
|                 f'\n' |                 f'\n' | ||||||
|                 f'^^WARNING^^\n' |                 f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n' | ||||||
|                 f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n' |  | ||||||
|             ) |             ) | ||||||
|             if extra_note: |             if extra_note: | ||||||
|                 note += ( |                 note += ( | ||||||
|                     f'\n' |                     f'\n' | ||||||
|                     f'{extra_note}\n' |                     f'{extra_note}\n' | ||||||
|                 ) |                 ) | ||||||
|  |             exc_ctx.add_note(note) | ||||||
| 
 | 
 | ||||||
|             do_warn: bool = ( |             if type(exc_match) in always_warn_on: | ||||||
|                 never_warn_on.get( |  | ||||||
|                     type(exc_ctx)  # masking type |  | ||||||
|                 ) |  | ||||||
|                 is not |  | ||||||
|                 type(exc_match)  # masked type |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             if do_warn: |  | ||||||
|                 exc_ctx.add_note(note) |  | ||||||
| 
 |  | ||||||
|             if ( |  | ||||||
|                 do_warn |  | ||||||
|                 and |  | ||||||
|                 type(exc_match) in always_warn_on |  | ||||||
|             ): |  | ||||||
|                 log.warning(note) |                 log.warning(note) | ||||||
| 
 | 
 | ||||||
|             if ( |             # await tractor.pause(shield=True) | ||||||
|                 do_warn |             if raise_unmasked: | ||||||
|                 and | 
 | ||||||
|                 raise_unmasked |  | ||||||
|             ): |  | ||||||
|                 if len(masked) < 2: |                 if len(masked) < 2: | ||||||
|                     # don't unmask already known "special" cases.. |  | ||||||
|                     if ( |  | ||||||
|                         _mask_cases |  | ||||||
|                         and |  | ||||||
|                         (cases := _mask_cases.get(type(exc_ctx))) |  | ||||||
|                         and |  | ||||||
|                         (masker_frame := is_expected_masking_case( |  | ||||||
|                             cases, |  | ||||||
|                             exc_ctx, |  | ||||||
|                             exc_match, |  | ||||||
|                         )) |  | ||||||
|                     ): |  | ||||||
|                         log.warning( |  | ||||||
|                             f'Ignoring already-known, non-ideal-but-valid ' |  | ||||||
|                             f'masker code @\n' |  | ||||||
|                             f'{masker_frame}\n' |  | ||||||
|                             f'\n' |  | ||||||
|                             f'NOT raising {exc_ctx} from masker {exc_match!r}\n' |  | ||||||
|                         ) |  | ||||||
|                         raise exc_match |  | ||||||
| 
 |  | ||||||
|                     raise exc_ctx from exc_match |                     raise exc_ctx from exc_match | ||||||
| 
 |                 else: | ||||||
|                 # ??TODO, see above but, possibly unmasking sub-exc |                     # ?TODO, see above but, possibly unmasking sub-exc | ||||||
|                 # entries if there are > 1 |                     # entries if there are > 1 | ||||||
|                 # else: |                     await pause(shield=True) | ||||||
|                 #     await pause(shield=True) |  | ||||||
|     else: |     else: | ||||||
|         raise |         raise | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue