forked from goodboy/tractor
				
			Merge pull request #391 from goodboy/cancelled_masking_guards
A refined `trio.Cancelled`-unmasking helperremotes/1757891924488435790/main
						commit
						34ca02ed11
					
				|  | @ -0,0 +1,85 @@ | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from functools import partial | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
| 
 | ||||
| log = tractor.log.get_logger( | ||||
|     name=__name__ | ||||
| ) | ||||
| 
 | ||||
| _lock: trio.Lock|None = None | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def acquire_singleton_lock( | ||||
| ) -> None: | ||||
|     global _lock | ||||
|     if _lock is None: | ||||
|         log.info('Allocating LOCK') | ||||
|         _lock = trio.Lock() | ||||
| 
 | ||||
|     log.info('TRYING TO LOCK ACQUIRE') | ||||
|     async with _lock: | ||||
|         log.info('ACQUIRED') | ||||
|         yield _lock | ||||
| 
 | ||||
|     log.info('RELEASED') | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| async def hold_lock_forever( | ||||
|     task_status=trio.TASK_STATUS_IGNORED | ||||
| ): | ||||
|     async with ( | ||||
|         tractor.trionics.maybe_raise_from_masking_exc(), | ||||
|         acquire_singleton_lock() as lock, | ||||
|     ): | ||||
|         task_status.started(lock) | ||||
|         await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     ignore_special_cases: bool, | ||||
|     loglevel: str = 'info', | ||||
|     debug_mode: bool = True, | ||||
| ): | ||||
|     async with ( | ||||
|         trio.open_nursery() as tn, | ||||
| 
 | ||||
|         # tractor.trionics.maybe_raise_from_masking_exc() | ||||
|         # ^^^ XXX NOTE, interestingly putting the unmasker | ||||
|         # here does not exhibit the same behaviour ?? | ||||
|     ): | ||||
|         if not ignore_special_cases: | ||||
|             from tractor.trionics import _taskc | ||||
|             _taskc._mask_cases.clear() | ||||
| 
 | ||||
|         _lock = await tn.start( | ||||
|             hold_lock_forever, | ||||
|         ) | ||||
|         with trio.move_on_after(0.2): | ||||
|             await tn.start( | ||||
|                 hold_lock_forever, | ||||
|             ) | ||||
| 
 | ||||
|         tn.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| # XXX, manual test as script | ||||
| if __name__ == '__main__': | ||||
|     tractor.log.get_console_log(level='info') | ||||
|     for case in [True, False]: | ||||
|         log.info( | ||||
|             f'\n' | ||||
|             f'------ RUNNING SCRIPT TRIAL ------\n' | ||||
|             f'ignore_special_cases: {case!r}\n' | ||||
|         ) | ||||
|         trio.run(partial( | ||||
|             main, | ||||
|             ignore_special_cases=case, | ||||
|             loglevel='info', | ||||
|         )) | ||||
|  | @ -0,0 +1,195 @@ | |||
| from contextlib import ( | ||||
|     contextmanager as cm, | ||||
|     # TODO, any diff in async case(s)?? | ||||
|     # asynccontextmanager as acm, | ||||
| ) | ||||
| from functools import partial | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
| 
 | ||||
| log = tractor.log.get_logger( | ||||
|     name=__name__ | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @cm | ||||
| def teardown_on_exc( | ||||
|     raise_from_handler: bool = False, | ||||
| ): | ||||
|     ''' | ||||
|     You could also have a teardown handler which catches any exc and | ||||
|     does some required teardown. In this case the problem is | ||||
|     compounded UNLESS you ensure the handler's scope is OUTSIDE the | ||||
|     `ux.aclose()`.. that is in the caller's enclosing scope. | ||||
| 
 | ||||
|     ''' | ||||
|     try: | ||||
|         yield | ||||
|     except BaseException as _berr: | ||||
|         berr = _berr | ||||
|         log.exception( | ||||
|             f'Handling termination teardown in child due to,\n' | ||||
|             f'{berr!r}\n' | ||||
|         ) | ||||
|         if raise_from_handler: | ||||
|             # XXX teardown ops XXX | ||||
|             # on termination these steps say need to be run to | ||||
|             # ensure wider system consistency (like the state of | ||||
|             # remote connections/services). | ||||
|             # | ||||
|             # HOWEVER, any bug in this teardown code is also | ||||
|             # masked by the `tx.aclose()`! | ||||
|             # this is also true if `_tn.cancel_scope` is | ||||
|             # `.cancel_called` by the parent in a graceful | ||||
|             # request case.. | ||||
| 
 | ||||
|             # simulate a bug in teardown handler. | ||||
|             raise RuntimeError( | ||||
|                 'woopsie teardown bug!' | ||||
|             ) | ||||
| 
 | ||||
|         raise  # no teardown bug. | ||||
| 
 | ||||
| 
 | ||||
| async def finite_stream_to_rent( | ||||
|     tx: trio.abc.SendChannel, | ||||
|     child_errors_mid_stream: bool, | ||||
|     raise_unmasked: bool, | ||||
| 
 | ||||
|     task_status: trio.TaskStatus[ | ||||
|         trio.CancelScope, | ||||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
| ): | ||||
|     async with ( | ||||
|         # XXX without this unmasker the mid-streaming RTE is never | ||||
|         # reported since it is masked by the `tx.aclose()` | ||||
|         # call which in turn raises `Cancelled`! | ||||
|         # | ||||
|         # NOTE, this is WITHOUT doing any exception handling | ||||
|         # inside the child  task! | ||||
|         # | ||||
|         # TODO, uncomment next LoC to see the supprsessed beg[RTE]! | ||||
|         tractor.trionics.maybe_raise_from_masking_exc( | ||||
|             raise_unmasked=raise_unmasked, | ||||
|         ), | ||||
| 
 | ||||
|         tx as tx,  # .aclose() is the guilty masker chkpt! | ||||
| 
 | ||||
|         # XXX, this ONLY matters in the | ||||
|         # `child_errors_mid_stream=False` case oddly!? | ||||
|         # THAT IS, if no tn is opened in that case then the | ||||
|         # test will not fail; it raises the RTE correctly? | ||||
|         # | ||||
|         # -> so it seems this new scope somehow affects the form of | ||||
|         #    eventual in the parent EG? | ||||
|         tractor.trionics.maybe_open_nursery( | ||||
|             nursery=( | ||||
|                 None | ||||
|                 if not child_errors_mid_stream | ||||
|                 else True | ||||
|             ), | ||||
|         ) as _tn, | ||||
|     ): | ||||
|         # pass our scope back to parent for supervision\ | ||||
|         # control. | ||||
|         cs: trio.CancelScope|None = ( | ||||
|             None | ||||
|             if _tn is True | ||||
|             else _tn.cancel_scope | ||||
|         ) | ||||
|         task_status.started(cs) | ||||
| 
 | ||||
|         with teardown_on_exc( | ||||
|             raise_from_handler=not child_errors_mid_stream, | ||||
|         ): | ||||
|             for i in range(100): | ||||
|                 log.debug( | ||||
|                     f'Child tx {i!r}\n' | ||||
|                 ) | ||||
|                 if ( | ||||
|                     child_errors_mid_stream | ||||
|                     and | ||||
|                     i == 66 | ||||
|                 ): | ||||
|                     # oh wait but WOOPS there's a bug | ||||
|                     # in that teardown code!? | ||||
|                     raise RuntimeError( | ||||
|                         'woopsie, a mid-streaming bug!?' | ||||
|                     ) | ||||
| 
 | ||||
|                 await tx.send(i) | ||||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     # TODO! toggle this for the 2 cases! | ||||
|     # 1. child errors mid-stream while parent is also requesting | ||||
|     #   (graceful) cancel of that child streamer. | ||||
|     # | ||||
|     # 2. child contains a teardown handler which contains a | ||||
|     #   bug and raises. | ||||
|     # | ||||
|     child_errors_mid_stream: bool, | ||||
| 
 | ||||
|     raise_unmasked: bool = False, | ||||
|     loglevel: str = 'info', | ||||
| ): | ||||
|     tractor.log.get_console_log(level=loglevel) | ||||
| 
 | ||||
|     # the `.aclose()` being checkpoints on these | ||||
|     # is the source of the problem.. | ||||
|     tx, rx = trio.open_memory_channel(1) | ||||
| 
 | ||||
|     async with ( | ||||
|         tractor.trionics.collapse_eg(), | ||||
|         trio.open_nursery() as tn, | ||||
|         rx as rx, | ||||
|     ): | ||||
|         _child_cs = await tn.start( | ||||
|             partial( | ||||
|                 finite_stream_to_rent, | ||||
|                 child_errors_mid_stream=child_errors_mid_stream, | ||||
|                 raise_unmasked=raise_unmasked, | ||||
|                 tx=tx, | ||||
|             ) | ||||
|         ) | ||||
|         async for msg in rx: | ||||
|             log.debug( | ||||
|                 f'Rent rx {msg!r}\n' | ||||
|             ) | ||||
| 
 | ||||
|             # simulate some external cancellation | ||||
|             # request **JUST BEFORE** the child errors. | ||||
|             if msg == 65: | ||||
|                 log.cancel( | ||||
|                     f'Cancelling parent on,\n' | ||||
|                     f'msg={msg}\n' | ||||
|                     f'\n' | ||||
|                     f'Simulates OOB cancel request!\n' | ||||
|                 ) | ||||
|                 tn.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| # XXX, manual test as script | ||||
| if __name__ == '__main__': | ||||
|     tractor.log.get_console_log(level='info') | ||||
|     for case in [True, False]: | ||||
|         log.info( | ||||
|             f'\n' | ||||
|             f'------ RUNNING SCRIPT TRIAL ------\n' | ||||
|             f'child_errors_midstream: {case!r}\n' | ||||
|         ) | ||||
|         try: | ||||
|             trio.run(partial( | ||||
|                 main, | ||||
|                 child_errors_mid_stream=case, | ||||
|                 # raise_unmasked=True, | ||||
|                 loglevel='info', | ||||
|             )) | ||||
|         except Exception as _exc: | ||||
|             exc = _exc | ||||
|             log.exception( | ||||
|                 'Should have raised an RTE or Cancelled?\n' | ||||
|             ) | ||||
|             breakpoint() | ||||
|  | @ -95,6 +95,7 @@ def run_example_in_subproc( | |||
|             and 'integration' not in p[0] | ||||
|             and 'advanced_faults' not in p[0] | ||||
|             and 'multihost' not in p[0] | ||||
|             and 'trio' not in p[0] | ||||
|         ) | ||||
|     ], | ||||
|     ids=lambda t: t[1], | ||||
|  |  | |||
|  | @ -6,11 +6,18 @@ want to see changed. | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from types import ModuleType | ||||
| 
 | ||||
| from functools import partial | ||||
| 
 | ||||
| import pytest | ||||
| from _pytest import pathlib | ||||
| from tractor.trionics import collapse_eg | ||||
| import trio | ||||
| from trio import TaskStatus | ||||
| from tractor._testing import ( | ||||
|     examples_dir, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|  | @ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Demo how a masking `trio.Cancelled` could be handled by unmasking from the | ||||
|     `.__context__` field when a user (by accident) re-raises from a `finally:`. | ||||
|     Demo how a masking `trio.Cancelled` could be handled by unmasking | ||||
|     from the `.__context__` field when a user (by accident) re-raises | ||||
|     from a `finally:`. | ||||
| 
 | ||||
|     ''' | ||||
|     import tractor | ||||
|  | @ -117,11 +125,9 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|         async with ( | ||||
|             trio.open_nursery() as tn, | ||||
|             tractor.trionics.maybe_raise_from_masking_exc( | ||||
|                 tn=tn, | ||||
|                 unmask_from=( | ||||
|                     trio.Cancelled | ||||
|                     if unmask_from_canc | ||||
|                     else None | ||||
|                     (trio.Cancelled,) if unmask_from_canc | ||||
|                     else () | ||||
|                 ), | ||||
|             ) | ||||
|         ): | ||||
|  | @ -136,8 +142,7 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|         with tractor.devx.maybe_open_crash_handler( | ||||
|             pdb=debug_mode, | ||||
|         ) as bxerr: | ||||
|             if bxerr: | ||||
|                 assert not bxerr.value | ||||
|             assert not bxerr.value | ||||
| 
 | ||||
|             async with ( | ||||
|                 wraps_tn_that_always_cancels() as tn, | ||||
|  | @ -145,11 +150,12 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|                 assert not tn.cancel_scope.cancel_called | ||||
|                 assert 0 | ||||
| 
 | ||||
|         assert ( | ||||
|             (err := bxerr.value) | ||||
|             and | ||||
|             type(err) is AssertionError | ||||
|         ) | ||||
|         if debug_mode: | ||||
|             assert ( | ||||
|                 (err := bxerr.value) | ||||
|                 and | ||||
|                 type(err) is AssertionError | ||||
|             ) | ||||
| 
 | ||||
|     with pytest.raises(ExceptionGroup) as excinfo: | ||||
|         trio.run(_main) | ||||
|  | @ -160,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|     assert len(assert_eg.exceptions) == 1 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def test_gatherctxs_with_memchan_breaks_multicancelled( | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task | ||||
|     will break a strict-eg-tn's multi-cancelled absorption.. | ||||
|     Demo how a using an `async with sndchan` inside | ||||
|     a `.trionics.gather_contexts()` task will break a strict-eg-tn's | ||||
|     multi-cancelled absorption.. | ||||
| 
 | ||||
|     ''' | ||||
|     from tractor import ( | ||||
|  | @ -192,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( | |||
|                 f'Closed {task!r}\n' | ||||
|             ) | ||||
| 
 | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with ( | ||||
|             # XXX should ensure ONLY the KBI | ||||
|  | @ -213,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( | |||
| 
 | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'raise_unmasked', [ | ||||
|         True, | ||||
|         pytest.param( | ||||
|             False, | ||||
|             marks=pytest.mark.xfail( | ||||
|                 reason="see examples/trio/send_chan_aclose_masks.py" | ||||
|             ) | ||||
|         ), | ||||
|     ] | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'child_errors_mid_stream', | ||||
|     [True, False], | ||||
| ) | ||||
| def test_unmask_aclose_as_checkpoint_on_aexit( | ||||
|     raise_unmasked: bool, | ||||
|     child_errors_mid_stream: bool, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Verify that our unmasker util works over the common case where | ||||
|     a mem-chan's `.aclose()` is included in an `@acm` stack | ||||
|     and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded | ||||
|     exception from user code resulting in a silent failure which | ||||
|     appears like graceful cancellation. | ||||
| 
 | ||||
|     This test suite is mostly implemented as an example script so it | ||||
|     could more easily be shared with `trio`-core peeps as `tractor`-less | ||||
|     minimum reproducing example. | ||||
| 
 | ||||
|     ''' | ||||
|     mod: ModuleType = pathlib.import_path( | ||||
|         examples_dir() | ||||
|         / 'trio' | ||||
|         / 'send_chan_aclose_masks_beg.py', | ||||
|         root=examples_dir(), | ||||
|         consider_namespace_packages=False, | ||||
|     ) | ||||
|     with pytest.raises(RuntimeError): | ||||
|         trio.run(partial( | ||||
|             mod.main, | ||||
|             raise_unmasked=raise_unmasked, | ||||
|             child_errors_mid_stream=child_errors_mid_stream, | ||||
|         )) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'ignore_special_cases', [ | ||||
|         True, | ||||
|         pytest.param( | ||||
|             False, | ||||
|             marks=pytest.mark.xfail( | ||||
|                 reason="see examples/trio/lockacquire_not_umasked.py" | ||||
|             ) | ||||
|         ), | ||||
|     ] | ||||
| ) | ||||
| def test_cancelled_lockacquire_in_ipctx_not_unmasked( | ||||
|     ignore_special_cases: bool, | ||||
|     loglevel: str, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     mod: ModuleType = pathlib.import_path( | ||||
|         examples_dir() | ||||
|         / 'trio' | ||||
|         / 'lockacquire_not_unmasked.py', | ||||
|         root=examples_dir(), | ||||
|         consider_namespace_packages=False, | ||||
|     ) | ||||
|     async def _main(): | ||||
|         with trio.fail_after(2): | ||||
|             await mod.main( | ||||
|                 ignore_special_cases=ignore_special_cases, | ||||
|                 loglevel=loglevel, | ||||
|                 debug_mode=debug_mode, | ||||
|             ) | ||||
| 
 | ||||
|     trio.run(_main) | ||||
|  |  | |||
|  | @ -654,8 +654,7 @@ async def _invoke( | |||
|                 # scope ensures unasking of the `await coro` below | ||||
|                 # *should* never be interfered with!! | ||||
|                 maybe_raise_from_masking_exc( | ||||
|                     tn=tn, | ||||
|                     unmask_from=Cancelled, | ||||
|                     unmask_from=(Cancelled,), | ||||
|                 ) as _mbme,  # maybe boxed masked exc | ||||
|             ): | ||||
|                 ctx._scope_nursery = tn | ||||
|  |  | |||
|  | @ -31,7 +31,6 @@ from typing import ( | |||
|     AsyncIterator, | ||||
|     Callable, | ||||
|     Hashable, | ||||
|     Optional, | ||||
|     Sequence, | ||||
|     TypeVar, | ||||
|     TYPE_CHECKING, | ||||
|  | @ -204,7 +203,7 @@ class _Cache: | |||
|     a kept-alive-while-in-use async resource. | ||||
| 
 | ||||
|     ''' | ||||
|     service_tn: Optional[trio.Nursery] = None | ||||
|     service_tn: trio.Nursery|None = None | ||||
|     locks: dict[Hashable, trio.Lock] = {} | ||||
|     users: int = 0 | ||||
|     values: dict[Any,  Any] = {} | ||||
|  | @ -213,7 +212,7 @@ class _Cache: | |||
|         tuple[trio.Nursery, trio.Event] | ||||
|     ] = {} | ||||
|     # nurseries: dict[int, trio.Nursery] = {} | ||||
|     no_more_users: Optional[trio.Event] = None | ||||
|     no_more_users: trio.Event|None = None | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def run_ctx( | ||||
|  | @ -223,16 +222,18 @@ class _Cache: | |||
|         task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         async with mng as value: | ||||
|             _, no_more_users = cls.resources[ctx_key] | ||||
|             cls.values[ctx_key] = value | ||||
|             task_status.started(value) | ||||
|             try: | ||||
|                 await no_more_users.wait() | ||||
|             finally: | ||||
|                 # discard nursery ref so it won't be re-used (an error)? | ||||
|                 value = cls.values.pop(ctx_key) | ||||
|                 cls.resources.pop(ctx_key) | ||||
|         try: | ||||
|             async with mng as value: | ||||
|                 _, no_more_users = cls.resources[ctx_key] | ||||
|                 try: | ||||
|                     cls.values[ctx_key] = value | ||||
|                     task_status.started(value) | ||||
|                     await no_more_users.wait() | ||||
|                 finally: | ||||
|                     value = cls.values.pop(ctx_key) | ||||
|         finally: | ||||
|             # discard nursery ref so it won't be re-used (an error)? | ||||
|             cls.resources.pop(ctx_key) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  |  | |||
|  | @ -22,7 +22,14 @@ from __future__ import annotations | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from typing import TYPE_CHECKING | ||||
| import inspect | ||||
| from types import ( | ||||
|     TracebackType, | ||||
| ) | ||||
| from typing import ( | ||||
|     Type, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from tractor.log import get_logger | ||||
|  | @ -60,12 +67,71 @@ def find_masked_excs( | |||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| _mask_cases: dict[ | ||||
|     Type[Exception],  # masked exc type | ||||
|     dict[ | ||||
|         int,  # inner-frame index into `inspect.getinnerframes()` | ||||
|         # `FrameInfo.function/filename: str`s to match | ||||
|         dict[str, str], | ||||
|     ], | ||||
| ] = { | ||||
|     trio.WouldBlock: { | ||||
|         # `trio.Lock.acquire()` has a checkpoint inside the | ||||
|         # `WouldBlock`-no_wait path's handler.. | ||||
|         -5: {  # "5th frame up" from checkpoint | ||||
|             'filename': 'trio/_sync.py', | ||||
|             'function': 'acquire', | ||||
|             # 'lineno': 605,  # matters? | ||||
|         }, | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def is_expected_masking_case( | ||||
|     cases: dict, | ||||
|     exc_ctx: Exception, | ||||
|     exc_match: BaseException, | ||||
| 
 | ||||
| ) -> bool|inspect.FrameInfo: | ||||
|     ''' | ||||
|     Determine whether the provided masked exception is from a known | ||||
|     bug/special/unintentional-`trio`-impl case which we do not wish | ||||
|     to unmask. | ||||
| 
 | ||||
|     Return any guilty `inspect.FrameInfo` ow `False`. | ||||
| 
 | ||||
|     ''' | ||||
|     exc_tb: TracebackType = exc_match.__traceback__ | ||||
|     if cases := _mask_cases.get(type(exc_ctx)): | ||||
|         inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb) | ||||
| 
 | ||||
|         # from tractor.devx.debug import mk_pdb | ||||
|         # mk_pdb().set_trace() | ||||
|         for iframe, matchon in cases.items(): | ||||
|             try: | ||||
|                 masker_frame: inspect.FrameInfo = inner[iframe] | ||||
|             except IndexError: | ||||
|                 continue | ||||
| 
 | ||||
|             for field, in_field in matchon.items(): | ||||
|                 val = getattr( | ||||
|                     masker_frame, | ||||
|                     field, | ||||
|                 ) | ||||
|                 if in_field not in val: | ||||
|                     break | ||||
|             else: | ||||
|                 return masker_frame | ||||
| 
 | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # XXX, relevant discussion @ `trio`-core, | ||||
| # https://github.com/python-trio/trio/issues/455 | ||||
| # | ||||
| @acm | ||||
| async def maybe_raise_from_masking_exc( | ||||
|     tn: trio.Nursery|None = None, | ||||
|     unmask_from: ( | ||||
|         BaseException| | ||||
|         tuple[BaseException] | ||||
|  | @ -74,18 +140,30 @@ async def maybe_raise_from_masking_exc( | |||
|     raise_unmasked: bool = True, | ||||
|     extra_note: str = ( | ||||
|         'This can occurr when,\n' | ||||
|         ' - a `trio.Nursery` scope embeds a `finally:`-block ' | ||||
|         'which executes a checkpoint!' | ||||
|         '\n' | ||||
|         ' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block ' | ||||
|         'which execs an un-shielded checkpoint!' | ||||
|         # | ||||
|         # ^TODO? other cases? | ||||
|     ), | ||||
| 
 | ||||
|     always_warn_on: tuple[BaseException] = ( | ||||
|     always_warn_on: tuple[Type[BaseException]] = ( | ||||
|         trio.Cancelled, | ||||
|     ), | ||||
| 
 | ||||
|     # don't ever unmask or warn on any masking pair, | ||||
|     # {<masked-excT-key> -> <masking-excT-value>} | ||||
|     never_warn_on: dict[ | ||||
|         Type[BaseException], | ||||
|         Type[BaseException], | ||||
|     ] = { | ||||
|         KeyboardInterrupt: trio.Cancelled, | ||||
|         trio.Cancelled: trio.Cancelled, | ||||
|     }, | ||||
|     # ^XXX, special case(s) where we warn-log bc likely | ||||
|     # there will be no operational diff since the exc | ||||
|     # is always expected to be consumed. | ||||
| 
 | ||||
| ) -> BoxedMaybeException: | ||||
|     ''' | ||||
|     Maybe un-mask and re-raise exception(s) suppressed by a known | ||||
|  | @ -104,81 +182,112 @@ async def maybe_raise_from_masking_exc( | |||
|         individual sub-excs but maintain the eg-parent's form right? | ||||
| 
 | ||||
|     ''' | ||||
|     if not isinstance(unmask_from, tuple): | ||||
|         raise ValueError( | ||||
|             f'Invalid unmask_from = {unmask_from!r}\n' | ||||
|             f'Must be a `tuple[Type[BaseException]]`.\n' | ||||
|         ) | ||||
| 
 | ||||
|     from tractor.devx.debug import ( | ||||
|         BoxedMaybeException, | ||||
|         pause, | ||||
|     ) | ||||
|     boxed_maybe_exc = BoxedMaybeException( | ||||
|         raise_on_exit=raise_unmasked, | ||||
|     ) | ||||
|     matching: list[BaseException]|None = None | ||||
|     maybe_eg: ExceptionGroup|None | ||||
| 
 | ||||
|     if tn: | ||||
|         try:  # handle egs | ||||
|             yield boxed_maybe_exc | ||||
|             return | ||||
|         except* unmask_from as _maybe_eg: | ||||
|             maybe_eg = _maybe_eg | ||||
|     try: | ||||
|         yield boxed_maybe_exc | ||||
|         return | ||||
|     except BaseException as _bexc: | ||||
|         bexc = _bexc | ||||
|         if isinstance(bexc, BaseExceptionGroup): | ||||
|             matches: ExceptionGroup | ||||
|             matches, _ = maybe_eg.split( | ||||
|                 unmask_from | ||||
|             ) | ||||
|             if not matches: | ||||
|                 raise | ||||
|             matches, _ = bexc.split(unmask_from) | ||||
|             if matches: | ||||
|                 matching = matches.exceptions | ||||
| 
 | ||||
|             matching: list[BaseException] = matches.exceptions | ||||
|     else: | ||||
|         try:  # handle non-egs | ||||
|             yield boxed_maybe_exc | ||||
|             return | ||||
|         except unmask_from as _maybe_exc: | ||||
|             maybe_exc = _maybe_exc | ||||
|             matching: list[BaseException] = [ | ||||
|                 maybe_exc | ||||
|             ] | ||||
| 
 | ||||
|         # XXX, only unmask-ed for debuggin! | ||||
|         # TODO, remove eventually.. | ||||
|         except BaseException as _berr: | ||||
|             berr = _berr | ||||
|             await pause(shield=True) | ||||
|             raise berr | ||||
|         elif ( | ||||
|             unmask_from | ||||
|             and | ||||
|             type(bexc) in unmask_from | ||||
|         ): | ||||
|             matching = [bexc] | ||||
| 
 | ||||
|     if matching is None: | ||||
|         raise | ||||
| 
 | ||||
|     masked: list[tuple[BaseException, BaseException]] = [] | ||||
|     for exc_match in matching: | ||||
| 
 | ||||
|         if exc_ctx := find_masked_excs( | ||||
|             maybe_masker=exc_match, | ||||
|             unmask_from={unmask_from}, | ||||
|             unmask_from=set(unmask_from), | ||||
|         ): | ||||
|             masked.append((exc_ctx, exc_match)) | ||||
|             masked.append(( | ||||
|                 exc_ctx, | ||||
|                 exc_match, | ||||
|             )) | ||||
|             boxed_maybe_exc.value = exc_match | ||||
|             note: str = ( | ||||
|                 f'\n' | ||||
|                 f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n' | ||||
|                 f'^^WARNING^^\n' | ||||
|                 f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n' | ||||
|             ) | ||||
|             if extra_note: | ||||
|                 note += ( | ||||
|                     f'\n' | ||||
|                     f'{extra_note}\n' | ||||
|                 ) | ||||
|             exc_ctx.add_note(note) | ||||
| 
 | ||||
|             if type(exc_match) in always_warn_on: | ||||
|             do_warn: bool = ( | ||||
|                 never_warn_on.get( | ||||
|                     type(exc_ctx)  # masking type | ||||
|                 ) | ||||
|                 is not | ||||
|                 type(exc_match)  # masked type | ||||
|             ) | ||||
| 
 | ||||
|             if do_warn: | ||||
|                 exc_ctx.add_note(note) | ||||
| 
 | ||||
|             if ( | ||||
|                 do_warn | ||||
|                 and | ||||
|                 type(exc_match) in always_warn_on | ||||
|             ): | ||||
|                 log.warning(note) | ||||
| 
 | ||||
|             # await tractor.pause(shield=True) | ||||
|             if raise_unmasked: | ||||
| 
 | ||||
|             if ( | ||||
|                 do_warn | ||||
|                 and | ||||
|                 raise_unmasked | ||||
|             ): | ||||
|                 if len(masked) < 2: | ||||
|                     # don't unmask already known "special" cases.. | ||||
|                     if ( | ||||
|                         _mask_cases | ||||
|                         and | ||||
|                         (cases := _mask_cases.get(type(exc_ctx))) | ||||
|                         and | ||||
|                         (masker_frame := is_expected_masking_case( | ||||
|                             cases, | ||||
|                             exc_ctx, | ||||
|                             exc_match, | ||||
|                         )) | ||||
|                     ): | ||||
|                         log.warning( | ||||
|                             f'Ignoring already-known, non-ideal-but-valid ' | ||||
|                             f'masker code @\n' | ||||
|                             f'{masker_frame}\n' | ||||
|                             f'\n' | ||||
|                             f'NOT raising {exc_ctx} from masker {exc_match!r}\n' | ||||
|                         ) | ||||
|                         raise exc_match | ||||
| 
 | ||||
|                     raise exc_ctx from exc_match | ||||
|                 else: | ||||
|                     # ?TODO, see above but, possibly unmasking sub-exc | ||||
|                     # entries if there are > 1 | ||||
|                     await pause(shield=True) | ||||
| 
 | ||||
|                 # ??TODO, see above but, possibly unmasking sub-exc | ||||
|                 # entries if there are > 1 | ||||
|                 # else: | ||||
|                 #     await pause(shield=True) | ||||
|     else: | ||||
|         raise | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue