Compare commits
	
		
			41 Commits 
		
	
	
		
			23809b8468
			...
			653f23a04c
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 653f23a04c | |
|  | 90db6f2299 | |
|  | b2d63bc102 | |
|  | d433606a6b | |
|  | cb9569eace | |
|  | 9798fcd3bb | |
|  | 03549c51ab | |
|  | 256016c515 | |
|  | 17b2a2cab4 | |
|  | 4fbafe7ca4 | |
|  | 7ffdf3483a | |
|  | 76ed0f2ef6 | |
|  | 2afb624c48 | |
|  | 885137ac19 | |
|  | 83ce2275b9 | |
|  | 9f757ffa63 | |
|  | 0c6d512ba4 | |
|  | fc130d06b8 | |
|  | 73423ef2b7 | |
|  | b1f2a6b394 | |
|  | 9489a2f84d | |
|  | 92eaed6fec | |
|  | 217d54b9d1 | |
|  | 34ca02ed11 | |
|  | 62a364a1d3 | |
|  | 07781e38cd | |
|  | 9c6b90ef04 | |
|  | 542d4c7840 | |
|  | 9aebe7d8f9 | |
|  | 04c3d5e239 | |
|  | 759174729c | |
|  | e9f3689191 | |
|  | 93aa39db07 | |
|  | 5ab642bdf0 | |
|  | ed18ecd064 | |
|  | cec0282953 | |
|  | 25c5847f2e | |
|  | ba793fadd9 | |
|  | d17864a432 | |
|  | 6c361a9564 | |
|  | 34ca7429c7 | 
|  | @ -0,0 +1,85 @@ | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from functools import partial | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
| 
 | ||||
| log = tractor.log.get_logger( | ||||
|     name=__name__ | ||||
| ) | ||||
| 
 | ||||
| _lock: trio.Lock|None = None | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def acquire_singleton_lock( | ||||
| ) -> None: | ||||
|     global _lock | ||||
|     if _lock is None: | ||||
|         log.info('Allocating LOCK') | ||||
|         _lock = trio.Lock() | ||||
| 
 | ||||
|     log.info('TRYING TO LOCK ACQUIRE') | ||||
|     async with _lock: | ||||
|         log.info('ACQUIRED') | ||||
|         yield _lock | ||||
| 
 | ||||
|     log.info('RELEASED') | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| async def hold_lock_forever( | ||||
|     task_status=trio.TASK_STATUS_IGNORED | ||||
| ): | ||||
|     async with ( | ||||
|         tractor.trionics.maybe_raise_from_masking_exc(), | ||||
|         acquire_singleton_lock() as lock, | ||||
|     ): | ||||
|         task_status.started(lock) | ||||
|         await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     ignore_special_cases: bool, | ||||
|     loglevel: str = 'info', | ||||
|     debug_mode: bool = True, | ||||
| ): | ||||
|     async with ( | ||||
|         trio.open_nursery() as tn, | ||||
| 
 | ||||
|         # tractor.trionics.maybe_raise_from_masking_exc() | ||||
|         # ^^^ XXX NOTE, interestingly putting the unmasker | ||||
|         # here does not exhibit the same behaviour ?? | ||||
|     ): | ||||
|         if not ignore_special_cases: | ||||
|             from tractor.trionics import _taskc | ||||
|             _taskc._mask_cases.clear() | ||||
| 
 | ||||
|         _lock = await tn.start( | ||||
|             hold_lock_forever, | ||||
|         ) | ||||
|         with trio.move_on_after(0.2): | ||||
|             await tn.start( | ||||
|                 hold_lock_forever, | ||||
|             ) | ||||
| 
 | ||||
|         tn.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| # XXX, manual test as script | ||||
| if __name__ == '__main__': | ||||
|     tractor.log.get_console_log(level='info') | ||||
|     for case in [True, False]: | ||||
|         log.info( | ||||
|             f'\n' | ||||
|             f'------ RUNNING SCRIPT TRIAL ------\n' | ||||
|             f'ignore_special_cases: {case!r}\n' | ||||
|         ) | ||||
|         trio.run(partial( | ||||
|             main, | ||||
|             ignore_special_cases=case, | ||||
|             loglevel='info', | ||||
|         )) | ||||
|  | @ -0,0 +1,195 @@ | |||
| from contextlib import ( | ||||
|     contextmanager as cm, | ||||
|     # TODO, any diff in async case(s)?? | ||||
|     # asynccontextmanager as acm, | ||||
| ) | ||||
| from functools import partial | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
| 
 | ||||
| log = tractor.log.get_logger( | ||||
|     name=__name__ | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @cm | ||||
| def teardown_on_exc( | ||||
|     raise_from_handler: bool = False, | ||||
| ): | ||||
|     ''' | ||||
|     You could also have a teardown handler which catches any exc and | ||||
|     does some required teardown. In this case the problem is | ||||
|     compounded UNLESS you ensure the handler's scope is OUTSIDE the | ||||
|     `ux.aclose()`.. that is in the caller's enclosing scope. | ||||
| 
 | ||||
|     ''' | ||||
|     try: | ||||
|         yield | ||||
|     except BaseException as _berr: | ||||
|         berr = _berr | ||||
|         log.exception( | ||||
|             f'Handling termination teardown in child due to,\n' | ||||
|             f'{berr!r}\n' | ||||
|         ) | ||||
|         if raise_from_handler: | ||||
|             # XXX teardown ops XXX | ||||
|             # on termination these steps say need to be run to | ||||
|             # ensure wider system consistency (like the state of | ||||
|             # remote connections/services). | ||||
|             # | ||||
|             # HOWEVER, any bug in this teardown code is also | ||||
|             # masked by the `tx.aclose()`! | ||||
|             # this is also true if `_tn.cancel_scope` is | ||||
|             # `.cancel_called` by the parent in a graceful | ||||
|             # request case.. | ||||
| 
 | ||||
|             # simulate a bug in teardown handler. | ||||
|             raise RuntimeError( | ||||
|                 'woopsie teardown bug!' | ||||
|             ) | ||||
| 
 | ||||
|         raise  # no teardown bug. | ||||
| 
 | ||||
| 
 | ||||
| async def finite_stream_to_rent( | ||||
|     tx: trio.abc.SendChannel, | ||||
|     child_errors_mid_stream: bool, | ||||
|     raise_unmasked: bool, | ||||
| 
 | ||||
|     task_status: trio.TaskStatus[ | ||||
|         trio.CancelScope, | ||||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
| ): | ||||
|     async with ( | ||||
|         # XXX without this unmasker the mid-streaming RTE is never | ||||
|         # reported since it is masked by the `tx.aclose()` | ||||
|         # call which in turn raises `Cancelled`! | ||||
|         # | ||||
|         # NOTE, this is WITHOUT doing any exception handling | ||||
|         # inside the child  task! | ||||
|         # | ||||
|         # TODO, uncomment next LoC to see the supprsessed beg[RTE]! | ||||
|         tractor.trionics.maybe_raise_from_masking_exc( | ||||
|             raise_unmasked=raise_unmasked, | ||||
|         ), | ||||
| 
 | ||||
|         tx as tx,  # .aclose() is the guilty masker chkpt! | ||||
| 
 | ||||
|         # XXX, this ONLY matters in the | ||||
|         # `child_errors_mid_stream=False` case oddly!? | ||||
|         # THAT IS, if no tn is opened in that case then the | ||||
|         # test will not fail; it raises the RTE correctly? | ||||
|         # | ||||
|         # -> so it seems this new scope somehow affects the form of | ||||
|         #    eventual in the parent EG? | ||||
|         tractor.trionics.maybe_open_nursery( | ||||
|             nursery=( | ||||
|                 None | ||||
|                 if not child_errors_mid_stream | ||||
|                 else True | ||||
|             ), | ||||
|         ) as _tn, | ||||
|     ): | ||||
|         # pass our scope back to parent for supervision\ | ||||
|         # control. | ||||
|         cs: trio.CancelScope|None = ( | ||||
|             None | ||||
|             if _tn is True | ||||
|             else _tn.cancel_scope | ||||
|         ) | ||||
|         task_status.started(cs) | ||||
| 
 | ||||
|         with teardown_on_exc( | ||||
|             raise_from_handler=not child_errors_mid_stream, | ||||
|         ): | ||||
|             for i in range(100): | ||||
|                 log.debug( | ||||
|                     f'Child tx {i!r}\n' | ||||
|                 ) | ||||
|                 if ( | ||||
|                     child_errors_mid_stream | ||||
|                     and | ||||
|                     i == 66 | ||||
|                 ): | ||||
|                     # oh wait but WOOPS there's a bug | ||||
|                     # in that teardown code!? | ||||
|                     raise RuntimeError( | ||||
|                         'woopsie, a mid-streaming bug!?' | ||||
|                     ) | ||||
| 
 | ||||
|                 await tx.send(i) | ||||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     # TODO! toggle this for the 2 cases! | ||||
|     # 1. child errors mid-stream while parent is also requesting | ||||
|     #   (graceful) cancel of that child streamer. | ||||
|     # | ||||
|     # 2. child contains a teardown handler which contains a | ||||
|     #   bug and raises. | ||||
|     # | ||||
|     child_errors_mid_stream: bool, | ||||
| 
 | ||||
|     raise_unmasked: bool = False, | ||||
|     loglevel: str = 'info', | ||||
| ): | ||||
|     tractor.log.get_console_log(level=loglevel) | ||||
| 
 | ||||
|     # the `.aclose()` being checkpoints on these | ||||
|     # is the source of the problem.. | ||||
|     tx, rx = trio.open_memory_channel(1) | ||||
| 
 | ||||
|     async with ( | ||||
|         tractor.trionics.collapse_eg(), | ||||
|         trio.open_nursery() as tn, | ||||
|         rx as rx, | ||||
|     ): | ||||
|         _child_cs = await tn.start( | ||||
|             partial( | ||||
|                 finite_stream_to_rent, | ||||
|                 child_errors_mid_stream=child_errors_mid_stream, | ||||
|                 raise_unmasked=raise_unmasked, | ||||
|                 tx=tx, | ||||
|             ) | ||||
|         ) | ||||
|         async for msg in rx: | ||||
|             log.debug( | ||||
|                 f'Rent rx {msg!r}\n' | ||||
|             ) | ||||
| 
 | ||||
|             # simulate some external cancellation | ||||
|             # request **JUST BEFORE** the child errors. | ||||
|             if msg == 65: | ||||
|                 log.cancel( | ||||
|                     f'Cancelling parent on,\n' | ||||
|                     f'msg={msg}\n' | ||||
|                     f'\n' | ||||
|                     f'Simulates OOB cancel request!\n' | ||||
|                 ) | ||||
|                 tn.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| # XXX, manual test as script | ||||
| if __name__ == '__main__': | ||||
|     tractor.log.get_console_log(level='info') | ||||
|     for case in [True, False]: | ||||
|         log.info( | ||||
|             f'\n' | ||||
|             f'------ RUNNING SCRIPT TRIAL ------\n' | ||||
|             f'child_errors_midstream: {case!r}\n' | ||||
|         ) | ||||
|         try: | ||||
|             trio.run(partial( | ||||
|                 main, | ||||
|                 child_errors_mid_stream=case, | ||||
|                 # raise_unmasked=True, | ||||
|                 loglevel='info', | ||||
|             )) | ||||
|         except Exception as _exc: | ||||
|             exc = _exc | ||||
|             log.exception( | ||||
|                 'Should have raised an RTE or Cancelled?\n' | ||||
|             ) | ||||
|             breakpoint() | ||||
|  | @ -95,6 +95,7 @@ def run_example_in_subproc( | |||
|             and 'integration' not in p[0] | ||||
|             and 'advanced_faults' not in p[0] | ||||
|             and 'multihost' not in p[0] | ||||
|             and 'trio' not in p[0] | ||||
|         ) | ||||
|     ], | ||||
|     ids=lambda t: t[1], | ||||
|  |  | |||
|  | @ -24,14 +24,10 @@ from tractor._testing import ( | |||
| ) | ||||
| 
 | ||||
| # XXX TODO cases: | ||||
| # - [ ] peer cancelled itself - so other peers should | ||||
| #   get errors reflecting that the peer was itself the .canceller? | ||||
| 
 | ||||
| # - [x] WE cancelled the peer and thus should not see any raised | ||||
| #   `ContextCancelled` as it should be reaped silently? | ||||
| #   => pretty sure `test_context_stream_semantics::test_caller_cancels()` | ||||
| #      already covers this case? | ||||
| 
 | ||||
| # - [x] INTER-PEER: some arbitrary remote peer cancels via | ||||
| #   Portal.cancel_actor(). | ||||
| #   => all other connected peers should get that cancel requesting peer's | ||||
|  | @ -44,16 +40,6 @@ from tractor._testing import ( | |||
| #   that also spawned a remote task task in that same peer-parent. | ||||
| 
 | ||||
| 
 | ||||
| # def test_self_cancel(): | ||||
| #     ''' | ||||
| #     2 cases: | ||||
| #     - calls `Actor.cancel()` locally in some task | ||||
| #     - calls LocalPortal.cancel_actor()` ? | ||||
| 
 | ||||
| #     ''' | ||||
| #     ... | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def open_stream_then_sleep_forever( | ||||
|     ctx: Context, | ||||
|  | @ -806,7 +792,7 @@ async def basic_echo_server( | |||
|     ctx: Context, | ||||
|     peer_name: str = 'wittle_bruv', | ||||
| 
 | ||||
|     err_after: int|None = None, | ||||
|     err_after_imsg: int|None = None, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|  | @ -835,8 +821,9 @@ async def basic_echo_server( | |||
|             await ipc.send(resp) | ||||
| 
 | ||||
|             if ( | ||||
|                 err_after | ||||
|                 and i > err_after | ||||
|                 err_after_imsg | ||||
|                 and | ||||
|                 i > err_after_imsg | ||||
|             ): | ||||
|                 raise RuntimeError( | ||||
|                     f'Simulated error in `{peer_name}`' | ||||
|  | @ -978,7 +965,8 @@ async def tell_little_bro( | |||
|     actor_name: str, | ||||
| 
 | ||||
|     caller: str = '', | ||||
|     err_after: int|None = None, | ||||
|     err_after: float|None = None, | ||||
|     rng_seed: int = 50, | ||||
| ): | ||||
|     # contact target actor, do a stream dialog. | ||||
|     async with ( | ||||
|  | @ -989,14 +977,18 @@ async def tell_little_bro( | |||
|             basic_echo_server, | ||||
| 
 | ||||
|             # XXX proxy any delayed err condition | ||||
|             err_after=err_after, | ||||
|             err_after_imsg=( | ||||
|                 err_after * rng_seed | ||||
|                 if err_after is not None | ||||
|                 else None | ||||
|             ), | ||||
|         ) as (sub_ctx, first), | ||||
| 
 | ||||
|         sub_ctx.open_stream() as echo_ipc, | ||||
|     ): | ||||
|         actor: Actor = current_actor() | ||||
|         uid: tuple = actor.uid | ||||
|         for i in range(100): | ||||
|         for i in range(rng_seed): | ||||
|             msg: tuple = ( | ||||
|                 uid, | ||||
|                 i, | ||||
|  | @ -1021,13 +1013,13 @@ async def tell_little_bro( | |||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'raise_sub_spawn_error_after', | ||||
|     [None, 50], | ||||
|     [None, 0.5], | ||||
| ) | ||||
| def test_peer_spawns_and_cancels_service_subactor( | ||||
|     debug_mode: bool, | ||||
|     raise_client_error: str, | ||||
|     reg_addr: tuple[str, int], | ||||
|     raise_sub_spawn_error_after: int|None, | ||||
|     raise_sub_spawn_error_after: float|None, | ||||
| ): | ||||
|     # NOTE: this tests for the modden `mod wks open piker` bug | ||||
|     # discovered as part of implementing workspace ctx | ||||
|  | @ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|     #   and the server's spawned child should cancel and terminate! | ||||
|     peer_name: str = 'little_bro' | ||||
| 
 | ||||
| 
 | ||||
|     def check_inner_rte(rae: RemoteActorError): | ||||
|         ''' | ||||
|         Validate the little_bro's relayed inception! | ||||
|  | @ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         ) | ||||
| 
 | ||||
|                     try: | ||||
|                         res = await client_ctx.result(hide_tb=False) | ||||
| 
 | ||||
|                         res = await client_ctx.wait_for_result(hide_tb=False) | ||||
|                         # in remote (relayed inception) error | ||||
|                         # case, we should error on the line above! | ||||
|                         if raise_sub_spawn_error_after: | ||||
|  | @ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         assert isinstance(res, ContextCancelled) | ||||
|                         assert client_ctx.cancel_acked | ||||
|                         assert res.canceller == root.uid | ||||
|                         assert not raise_sub_spawn_error_after | ||||
| 
 | ||||
|                         # cancelling the spawner sub should | ||||
|                         # transitively cancel it's sub, the little | ||||
|                         # bruv. | ||||
|                         print('root cancelling server/client sub-actors') | ||||
|                         await spawn_ctx.cancel() | ||||
|                         async with tractor.find_actor( | ||||
|                             name=peer_name, | ||||
|                         ) as sub: | ||||
|                             assert not sub | ||||
| 
 | ||||
|                     # XXX, only for tracing | ||||
|                     # except BaseException as _berr: | ||||
|                     #     berr = _berr | ||||
|                     #     await tractor.pause(shield=True) | ||||
|                     #     raise berr | ||||
| 
 | ||||
|                     except RemoteActorError as rae: | ||||
|                         _err = rae | ||||
|  | @ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
|                         raise | ||||
|                         # await tractor.pause() | ||||
| 
 | ||||
|                     else: | ||||
|                         assert not raise_sub_spawn_error_after | ||||
| 
 | ||||
|                         # cancelling the spawner sub should | ||||
|                         # transitively cancel it's sub, the little | ||||
|                         # bruv. | ||||
|                         print('root cancelling server/client sub-actors') | ||||
|                         await spawn_ctx.cancel() | ||||
|                         async with tractor.find_actor( | ||||
|                             name=peer_name, | ||||
|                         ) as sub: | ||||
|                             assert not sub | ||||
| 
 | ||||
|                     # await tractor.pause() | ||||
|                     # await server.cancel_actor() | ||||
| 
 | ||||
|             except RemoteActorError as rae: | ||||
|  | @ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
| 
 | ||||
|             # since we called `.cancel_actor()`, `.cancel_ack` | ||||
|             # will not be set on the ctx bc `ctx.cancel()` was not | ||||
|             # called directly fot this confext. | ||||
|             # called directly for this confext. | ||||
|             except ContextCancelled as ctxc: | ||||
|                 _ctxc = ctxc | ||||
|                 print( | ||||
|  | @ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor( | |||
| 
 | ||||
|                 # assert spawn_ctx.cancelled_caught | ||||
| 
 | ||||
|     async def _main(): | ||||
|         with trio.fail_after( | ||||
|             3 if not debug_mode | ||||
|             else 999 | ||||
|         ): | ||||
|             await main() | ||||
| 
 | ||||
|     if raise_sub_spawn_error_after: | ||||
|         with pytest.raises(RemoteActorError) as excinfo: | ||||
|             trio.run(main) | ||||
|             trio.run(_main) | ||||
| 
 | ||||
|         rae: RemoteActorError = excinfo.value | ||||
|         check_inner_rte(rae) | ||||
| 
 | ||||
|     else: | ||||
|         trio.run(main) | ||||
|         trio.run(_main) | ||||
|  |  | |||
|  | @ -0,0 +1,239 @@ | |||
| ''' | ||||
| Define the details of inter-actor "out-of-band" (OoB) cancel | ||||
| semantics, that is how cancellation works when a cancel request comes | ||||
| from the different concurrency (primitive's) "layer" then where the | ||||
| eventual `trio.Task` actually raises a signal. | ||||
| 
 | ||||
| ''' | ||||
| from functools import partial | ||||
| # from contextlib import asynccontextmanager as acm | ||||
| # import itertools | ||||
| 
 | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
| from tractor import (  # typing | ||||
|     ActorNursery, | ||||
|     Portal, | ||||
|     Context, | ||||
|     # ContextCancelled, | ||||
|     # RemoteActorError, | ||||
| ) | ||||
| # from tractor._testing import ( | ||||
| #     tractor_test, | ||||
| #     expect_ctxc, | ||||
| # ) | ||||
| 
 | ||||
| # XXX TODO cases: | ||||
| # - [ ] peer cancelled itself - so other peers should | ||||
| #   get errors reflecting that the peer was itself the .canceller? | ||||
| 
 | ||||
| # def test_self_cancel(): | ||||
| #     ''' | ||||
| #     2 cases: | ||||
| #     - calls `Actor.cancel()` locally in some task | ||||
| #     - calls LocalPortal.cancel_actor()` ? | ||||
| # | ||||
| # things to ensure! | ||||
| # -[ ] the ctxc raised in a child should ideally show the tb of the | ||||
| #     underlying `Cancelled` checkpoint, i.e. | ||||
| #     `raise scope_error from ctxc`? | ||||
| # | ||||
| # -[ ] a self-cancelled context, if not allowed to block on | ||||
| #     `ctx.result()` at some point will hang since the `ctx._scope` | ||||
| #     is never `.cancel_called`; cases for this include, | ||||
| #     - an `open_ctx()` which never starteds before being OoB actor | ||||
| #       cancelled. | ||||
| #       |_ parent task will be blocked in `.open_context()` for the | ||||
| #         `Started` msg, and when the OoB ctxc arrives `ctx._scope` | ||||
| #         will never have been signalled.. | ||||
| 
 | ||||
| #     ''' | ||||
| #     ... | ||||
| 
 | ||||
| # TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py` | ||||
| # but with the `Lock.acquire()` from a `@context` to ensure the | ||||
| # implicit ignore-case-non-unmasking. | ||||
| # | ||||
| # @tractor.context | ||||
| # async def acquire_actor_global_lock( | ||||
| #     ctx: tractor.Context, | ||||
| #     ignore_special_cases: bool, | ||||
| # ): | ||||
| 
 | ||||
| #     async with maybe_unmask_excs( | ||||
| #         ignore_special_cases=ignore_special_cases, | ||||
| #     ): | ||||
| #         await ctx.started('locked') | ||||
| 
 | ||||
| #     # block til cancelled | ||||
| #     await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def sleep_forever( | ||||
|     ctx: tractor.Context, | ||||
|     # ignore_special_cases: bool, | ||||
|     do_started: bool, | ||||
| ): | ||||
| 
 | ||||
|     # async with maybe_unmask_excs( | ||||
|     #     ignore_special_cases=ignore_special_cases, | ||||
|     # ): | ||||
|     #     await ctx.started('locked') | ||||
|     if do_started: | ||||
|         await ctx.started() | ||||
| 
 | ||||
|     # block til cancelled | ||||
|     print('sleepin on child-side..') | ||||
|     await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'cancel_ctx', | ||||
|     [True, False], | ||||
| ) | ||||
| def test_cancel_ctx_with_parent_side_entered_in_bg_task( | ||||
|     debug_mode: bool, | ||||
|     loglevel: str, | ||||
|     cancel_ctx: bool, | ||||
| ): | ||||
|     ''' | ||||
|     The most "basic" out-of-band-task self-cancellation case where | ||||
|     `Portal.open_context()` is entered in a bg task and the | ||||
|     parent-task (of the containing nursery) calls `Context.cancel()` | ||||
|     without the child knowing; the `Context._scope` should be | ||||
|     `.cancel_called` when the IPC ctx's child-side relays | ||||
|     a `ContextCancelled` with a `.canceller` set to the parent | ||||
|     actor('s task). | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         with trio.fail_after( | ||||
|             2 if not debug_mode else 999, | ||||
|         ): | ||||
|             an: ActorNursery | ||||
|             async with ( | ||||
|                 tractor.open_nursery( | ||||
|                     debug_mode=debug_mode, | ||||
|                     loglevel='devx', | ||||
|                     enable_stack_on_sig=True, | ||||
|                 ) as an, | ||||
|                 trio.open_nursery() as tn, | ||||
|             ): | ||||
|                 ptl: Portal = await an.start_actor( | ||||
|                     'sub', | ||||
|                     enable_modules=[__name__], | ||||
|                 ) | ||||
| 
 | ||||
|                 async def _open_ctx_async( | ||||
|                     do_started: bool = True, | ||||
|                     task_status=trio.TASK_STATUS_IGNORED, | ||||
|                 ): | ||||
|                     # do we expect to never enter the | ||||
|                     # `.open_context()` below. | ||||
|                     if not do_started: | ||||
|                         task_status.started() | ||||
| 
 | ||||
|                     async with ptl.open_context( | ||||
|                         sleep_forever, | ||||
|                         do_started=do_started, | ||||
|                     ) as (ctx, first): | ||||
|                         task_status.started(ctx) | ||||
|                         await trio.sleep_forever() | ||||
| 
 | ||||
|                 # XXX, this is the key OoB part! | ||||
|                 # | ||||
|                 # - start the `.open_context()` in a bg task which | ||||
|                 #   blocks inside the embedded scope-body, | ||||
|                 # | ||||
|                 # -  when we call `Context.cancel()` it **is | ||||
|                 #   not** from the same task which eventually runs | ||||
|                 #   `.__aexit__()`, | ||||
|                 # | ||||
|                 # - since the bg "opener" task will be in | ||||
|                 #   a `trio.sleep_forever()`, it must be interrupted | ||||
|                 #   by the `ContextCancelled` delivered from the | ||||
|                 #   child-side; `Context._scope: CancelScope` MUST | ||||
|                 #   be `.cancel_called`! | ||||
|                 # | ||||
|                 print('ASYNC opening IPC context in subtask..') | ||||
|                 maybe_ctx: Context|None = await tn.start(partial( | ||||
|                     _open_ctx_async, | ||||
|                 )) | ||||
| 
 | ||||
|                 if ( | ||||
|                     maybe_ctx | ||||
|                     and | ||||
|                     cancel_ctx | ||||
|                 ): | ||||
|                     print('cancelling first IPC ctx!') | ||||
|                     await maybe_ctx.cancel() | ||||
| 
 | ||||
|                 # XXX, note that despite `maybe_context.cancel()` | ||||
|                 # being called above, it's the parent (bg) task | ||||
|                 # which was originally never interrupted in | ||||
|                 # the `ctx._scope` body due to missing case logic in | ||||
|                 # `ctx._maybe_cancel_and_set_remote_error()`. | ||||
|                 # | ||||
|                 # It didn't matter that the subactor process was | ||||
|                 # already terminated and reaped, nothing was | ||||
|                 # cancelling the ctx-parent task's scope! | ||||
|                 # | ||||
|                 print('cancelling subactor!') | ||||
|                 await ptl.cancel_actor() | ||||
| 
 | ||||
|                 if maybe_ctx: | ||||
|                     try: | ||||
|                         await maybe_ctx.wait_for_result() | ||||
|                     except tractor.ContextCancelled as ctxc: | ||||
|                         assert not cancel_ctx | ||||
|                         assert ( | ||||
|                             ctxc.canceller | ||||
|                             == | ||||
|                             tractor.current_actor().aid.uid | ||||
|                         ) | ||||
|                         # don't re-raise since it'll trigger | ||||
|                         # an EG from the above tn. | ||||
| 
 | ||||
|     if cancel_ctx: | ||||
|         # graceful self-cancel | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     else: | ||||
|         # ctx parent task should see OoB ctxc due to | ||||
|         # `ptl.cancel_actor()`. | ||||
|         with pytest.raises(tractor.ContextCancelled) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         assert 'root' in excinfo.value.canceller[0] | ||||
| 
 | ||||
| 
 | ||||
| # def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it( | ||||
| #     debug_mode: bool, | ||||
| #     loglevel: str, | ||||
| # ): | ||||
| #     ''' | ||||
| #     Demos OoB cancellation from the perspective of a ctx opened with | ||||
| #     a child subactor where the parent cancels the child at the "actor | ||||
| #     layer" using `Portal.cancel_actor()` and thus the | ||||
| #     `ContextCancelled.canceller` received by the ctx's parent-side | ||||
| #     task will appear to be a "self cancellation" even though that | ||||
| #     specific task itself was not cancelled and thus | ||||
| #     `Context.cancel_called ==False`. | ||||
| #     ''' | ||||
|                 # TODO, do we have an existing implied ctx | ||||
|                 # cancel test like this? | ||||
|                 # with trio.move_on_after(0.5):# as cs: | ||||
|                 #     await _open_ctx_async( | ||||
|                 #         do_started=False, | ||||
|                 #     ) | ||||
| 
 | ||||
| 
 | ||||
|                 # in-line ctx scope should definitely raise | ||||
|                 # a ctxc with `.canceller = 'root'` | ||||
|                 # async with ptl.open_context( | ||||
|                 #     sleep_forever, | ||||
|                 #     do_started=True, | ||||
|                 # ) as pair: | ||||
| 
 | ||||
|  | @ -6,11 +6,18 @@ want to see changed. | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from types import ModuleType | ||||
| 
 | ||||
| from functools import partial | ||||
| 
 | ||||
| import pytest | ||||
| from _pytest import pathlib | ||||
| from tractor.trionics import collapse_eg | ||||
| import trio | ||||
| from trio import TaskStatus | ||||
| from tractor._testing import ( | ||||
|     examples_dir, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|  | @ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Demo how a masking `trio.Cancelled` could be handled by unmasking from the | ||||
|     `.__context__` field when a user (by accident) re-raises from a `finally:`. | ||||
|     Demo how a masking `trio.Cancelled` could be handled by unmasking | ||||
|     from the `.__context__` field when a user (by accident) re-raises | ||||
|     from a `finally:`. | ||||
| 
 | ||||
|     ''' | ||||
|     import tractor | ||||
|  | @ -117,11 +125,9 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|         async with ( | ||||
|             trio.open_nursery() as tn, | ||||
|             tractor.trionics.maybe_raise_from_masking_exc( | ||||
|                 tn=tn, | ||||
|                 unmask_from=( | ||||
|                     trio.Cancelled | ||||
|                     if unmask_from_canc | ||||
|                     else None | ||||
|                     (trio.Cancelled,) if unmask_from_canc | ||||
|                     else () | ||||
|                 ), | ||||
|             ) | ||||
|         ): | ||||
|  | @ -136,8 +142,7 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|         with tractor.devx.maybe_open_crash_handler( | ||||
|             pdb=debug_mode, | ||||
|         ) as bxerr: | ||||
|             if bxerr: | ||||
|                 assert not bxerr.value | ||||
|             assert not bxerr.value | ||||
| 
 | ||||
|             async with ( | ||||
|                 wraps_tn_that_always_cancels() as tn, | ||||
|  | @ -145,11 +150,12 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|                 assert not tn.cancel_scope.cancel_called | ||||
|                 assert 0 | ||||
| 
 | ||||
|         assert ( | ||||
|             (err := bxerr.value) | ||||
|             and | ||||
|             type(err) is AssertionError | ||||
|         ) | ||||
|         if debug_mode: | ||||
|             assert ( | ||||
|                 (err := bxerr.value) | ||||
|                 and | ||||
|                 type(err) is AssertionError | ||||
|             ) | ||||
| 
 | ||||
|     with pytest.raises(ExceptionGroup) as excinfo: | ||||
|         trio.run(_main) | ||||
|  | @ -160,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err( | |||
|     assert len(assert_eg.exceptions) == 1 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def test_gatherctxs_with_memchan_breaks_multicancelled( | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task | ||||
|     will break a strict-eg-tn's multi-cancelled absorption.. | ||||
|     Demo how a using an `async with sndchan` inside | ||||
|     a `.trionics.gather_contexts()` task will break a strict-eg-tn's | ||||
|     multi-cancelled absorption.. | ||||
| 
 | ||||
|     ''' | ||||
|     from tractor import ( | ||||
|  | @ -192,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( | |||
|                 f'Closed {task!r}\n' | ||||
|             ) | ||||
| 
 | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with ( | ||||
|             # XXX should ensure ONLY the KBI | ||||
|  | @ -213,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( | |||
| 
 | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'raise_unmasked', [ | ||||
|         True, | ||||
|         pytest.param( | ||||
|             False, | ||||
|             marks=pytest.mark.xfail( | ||||
|                 reason="see examples/trio/send_chan_aclose_masks.py" | ||||
|             ) | ||||
|         ), | ||||
|     ] | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'child_errors_mid_stream', | ||||
|     [True, False], | ||||
| ) | ||||
| def test_unmask_aclose_as_checkpoint_on_aexit( | ||||
|     raise_unmasked: bool, | ||||
|     child_errors_mid_stream: bool, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Verify that our unmasker util works over the common case where | ||||
|     a mem-chan's `.aclose()` is included in an `@acm` stack | ||||
|     and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded | ||||
|     exception from user code resulting in a silent failure which | ||||
|     appears like graceful cancellation. | ||||
| 
 | ||||
|     This test suite is mostly implemented as an example script so it | ||||
|     could more easily be shared with `trio`-core peeps as `tractor`-less | ||||
|     minimum reproducing example. | ||||
| 
 | ||||
|     ''' | ||||
|     mod: ModuleType = pathlib.import_path( | ||||
|         examples_dir() | ||||
|         / 'trio' | ||||
|         / 'send_chan_aclose_masks_beg.py', | ||||
|         root=examples_dir(), | ||||
|         consider_namespace_packages=False, | ||||
|     ) | ||||
|     with pytest.raises(RuntimeError): | ||||
|         trio.run(partial( | ||||
|             mod.main, | ||||
|             raise_unmasked=raise_unmasked, | ||||
|             child_errors_mid_stream=child_errors_mid_stream, | ||||
|         )) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'ignore_special_cases', [ | ||||
|         True, | ||||
|         pytest.param( | ||||
|             False, | ||||
|             marks=pytest.mark.xfail( | ||||
|                 reason="see examples/trio/lockacquire_not_umasked.py" | ||||
|             ) | ||||
|         ), | ||||
|     ] | ||||
| ) | ||||
| def test_cancelled_lockacquire_in_ipctx_not_unmasked( | ||||
|     ignore_special_cases: bool, | ||||
|     loglevel: str, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     mod: ModuleType = pathlib.import_path( | ||||
|         examples_dir() | ||||
|         / 'trio' | ||||
|         / 'lockacquire_not_unmasked.py', | ||||
|         root=examples_dir(), | ||||
|         consider_namespace_packages=False, | ||||
|     ) | ||||
|     async def _main(): | ||||
|         with trio.fail_after(2): | ||||
|             await mod.main( | ||||
|                 ignore_special_cases=ignore_special_cases, | ||||
|                 loglevel=loglevel, | ||||
|                 debug_mode=debug_mode, | ||||
|             ) | ||||
| 
 | ||||
|     trio.run(_main) | ||||
|  |  | |||
|  | @ -442,25 +442,25 @@ class Context: | |||
|         ''' | ||||
|         Records whether cancellation has been requested for this context | ||||
|         by a call to  `.cancel()` either due to, | ||||
|         - either an explicit call by some local task, | ||||
|         - an explicit call by some local task, | ||||
|         - or an implicit call due to an error caught inside | ||||
|           the ``Portal.open_context()`` block. | ||||
|           the `Portal.open_context()` block. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._cancel_called | ||||
| 
 | ||||
|     @cancel_called.setter | ||||
|     def cancel_called(self, val: bool) -> None: | ||||
|         ''' | ||||
|         Set the self-cancelled request `bool` value. | ||||
|     # XXX, to debug who frickin sets it.. | ||||
|     # @cancel_called.setter | ||||
|     # def cancel_called(self, val: bool) -> None: | ||||
|     #     ''' | ||||
|     #     Set the self-cancelled request `bool` value. | ||||
| 
 | ||||
|         ''' | ||||
|         # to debug who frickin sets it.. | ||||
|         # if val: | ||||
|         #     from .devx import pause_from_sync | ||||
|         #     pause_from_sync() | ||||
|     #     ''' | ||||
|     #     if val: | ||||
|     #         from .devx import pause_from_sync | ||||
|     #         pause_from_sync() | ||||
| 
 | ||||
|         self._cancel_called = val | ||||
|     #     self._cancel_called = val | ||||
| 
 | ||||
|     @property | ||||
|     def canceller(self) -> tuple[str, str]|None: | ||||
|  | @ -635,6 +635,71 @@ class Context: | |||
|         ''' | ||||
|         await self.chan.send(Stop(cid=self.cid)) | ||||
| 
 | ||||
|     @property | ||||
|     def parent_task(self) -> trio.Task: | ||||
|         ''' | ||||
|         This IPC context's "owning task" which is a `trio.Task` | ||||
|         on one of the "sides" of the IPC. | ||||
| 
 | ||||
|         Note that the "parent_" prefix here refers to the local | ||||
|         `trio` task tree using the same interface as | ||||
|         `trio.Nursery.parent_task` whereas for IPC contexts, | ||||
|         a different cross-actor task hierarchy exists: | ||||
| 
 | ||||
|         - a "parent"-side which originally entered | ||||
|           `Portal.open_context()`, | ||||
| 
 | ||||
|         - the "child"-side which was spawned and scheduled to invoke | ||||
|           a function decorated with `@tractor.context`. | ||||
| 
 | ||||
|         This task is thus a handle to mem-domain-distinct/per-process | ||||
|         `Nursery.parent_task` depending on in which of the above | ||||
|         "sides" this context exists. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._task | ||||
| 
 | ||||
|     def _is_blocked_on_rx_chan(self) -> bool: | ||||
|         ''' | ||||
|         Predicate to indicate whether the owner `._task: trio.Task` is | ||||
|         currently blocked (by `.receive()`-ing) on its underlying RPC | ||||
|         feeder `._rx_chan`. | ||||
| 
 | ||||
|         This knowledge is highly useful when handling so called | ||||
|         "out-of-band" (OoB) cancellation conditions where a peer | ||||
|         actor's task transmitted some remote error/cancel-msg and we | ||||
|         must know whether to signal-via-cancel currently executing | ||||
|         "user-code" (user defined code embedded in `ctx._scope`) or | ||||
|         simply to forward the IPC-msg-as-error **without calling** | ||||
|         `._scope.cancel()`. | ||||
| 
 | ||||
|         In the latter case it is presumed that if the owner task is | ||||
|         blocking for the next IPC msg, it will eventually receive, | ||||
|         process and raise the equivalent local error **without** | ||||
|         requiring `._scope.cancel()` to be explicitly called by the | ||||
|         *delivering OoB RPC-task* (via `_deliver_msg()`). | ||||
| 
 | ||||
|         ''' | ||||
|         # NOTE, see the mem-chan meth-impls for *why* this | ||||
|         # logic works, | ||||
|         # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` | ||||
|         # | ||||
|         # XXX realize that this is NOT an | ||||
|         # official/will-be-loudly-deprecated API: | ||||
|         # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data | ||||
|         #  |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled | ||||
|         # | ||||
|         # orig repo intro in the mem-chan change over patch: | ||||
|         # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 | ||||
|         #  |_https://github.com/python-trio/trio/pull/616 | ||||
|         #  |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 | ||||
|         # | ||||
|         return ( | ||||
|             self._task.custom_sleep_data | ||||
|             is | ||||
|             self._rx_chan | ||||
|         ) | ||||
| 
 | ||||
|     def _maybe_cancel_and_set_remote_error( | ||||
|         self, | ||||
|         error: BaseException, | ||||
|  | @ -787,13 +852,27 @@ class Context: | |||
|         if self._canceller is None: | ||||
|             log.error('Ctx has no canceller set!?') | ||||
| 
 | ||||
|         cs: trio.CancelScope = self._scope | ||||
| 
 | ||||
|         # ?TODO? see comment @ .start_remote_task()` | ||||
|         # | ||||
|         # if not cs: | ||||
|         #     from .devx import mk_pdb | ||||
|         #     mk_pdb().set_trace() | ||||
|         #     raise RuntimeError( | ||||
|         #         f'IPC ctx was not be opened prior to remote error delivery !?\n' | ||||
|         #         f'{self}\n' | ||||
|         #         f'\n' | ||||
|         #         f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' | ||||
|         #     ) | ||||
| 
 | ||||
|         # Cancel the local `._scope`, catch that | ||||
|         # `._scope.cancelled_caught` and re-raise any remote error | ||||
|         # once exiting (or manually calling `.wait_for_result()`) the | ||||
|         # `.open_context()`  block. | ||||
|         cs: trio.CancelScope = self._scope | ||||
|         if ( | ||||
|             cs | ||||
|             and not cs.cancel_called | ||||
| 
 | ||||
|             # XXX this is an expected cancel request response | ||||
|             # message and we **don't need to raise it** in the | ||||
|  | @ -802,8 +881,7 @@ class Context: | |||
|             # if `._cancel_called` then `.cancel_acked and .cancel_called` | ||||
|             # always should be set. | ||||
|             and not self._is_self_cancelled() | ||||
|             and not cs.cancel_called | ||||
|             and not cs.cancelled_caught | ||||
|             # and not cs.cancelled_caught | ||||
|         ): | ||||
|             if ( | ||||
|                 msgerr | ||||
|  | @ -814,7 +892,7 @@ class Context: | |||
|                 not self._cancel_on_msgerr | ||||
|             ): | ||||
|                 message: str = ( | ||||
|                     'NOT Cancelling `Context._scope` since,\n' | ||||
|                     f'NOT Cancelling `Context._scope` since,\n' | ||||
|                     f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' | ||||
|                     f'AND we got a msg-type-error!\n' | ||||
|                     f'{error}\n' | ||||
|  | @ -824,13 +902,43 @@ class Context: | |||
|                 # `trio.Cancelled` subtype here ;) | ||||
|                 # https://github.com/goodboy/tractor/issues/368 | ||||
|                 message: str = 'Cancelling `Context._scope` !\n\n' | ||||
|                 # from .devx import pause_from_sync | ||||
|                 # pause_from_sync() | ||||
|                 self._scope.cancel() | ||||
|         else: | ||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||
|                 cs.cancel() | ||||
| 
 | ||||
|         # TODO, explicit condition for OoB (self-)cancellation? | ||||
|         # - we called `Portal.cancel_actor()` from this actor | ||||
|         #   and the peer ctx task delivered ctxc due to it. | ||||
|         # - currently `self._is_self_cancelled()` will be true | ||||
|         #   since the ctxc.canceller check will match us even though it | ||||
|         #   wasn't from this ctx specifically! | ||||
|         elif ( | ||||
|             cs | ||||
|             and self._is_self_cancelled() | ||||
|             and not cs.cancel_called | ||||
|         ): | ||||
|             message: str = ( | ||||
|                 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' | ||||
|                 '\n' | ||||
|             ) | ||||
|             # from .devx import mk_pdb | ||||
|             # mk_pdb().set_trace() | ||||
|             # TODO XXX, required to fix timeout failure in | ||||
|             # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` | ||||
|             # | ||||
| 
 | ||||
|             # XXX NOTE XXX, this is SUPER SUBTLE! | ||||
|             # we only want to cancel our embedded `._scope` | ||||
|             # if the ctx's current/using task is NOT blocked | ||||
|             # on `._rx_chan.receive()` and on some other | ||||
|             # `trio`-checkpoint since in the former case | ||||
|             # any `._remote_error` will be relayed through | ||||
|             # the rx-chan and appropriately raised by the owning | ||||
|             # `._task` directly. IF the owner task is however | ||||
|             # blocking elsewhere we need to interrupt it **now**. | ||||
|             if not self._is_blocked_on_rx_chan(): | ||||
|                 cs.cancel() | ||||
|         else: | ||||
|             # rx_stats = self._rx_chan.statistics() | ||||
|             message: str = 'NOT cancelling `Context._scope` !\n\n' | ||||
| 
 | ||||
|         fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' | ||||
|         if ( | ||||
|  | @ -854,6 +962,7 @@ class Context: | |||
|                 + | ||||
|                 cs_fmt | ||||
|             ) | ||||
| 
 | ||||
|         log.cancel( | ||||
|             message | ||||
|             + | ||||
|  | @ -946,8 +1055,9 @@ class Context: | |||
| 
 | ||||
|         ''' | ||||
|         side: str = self.side | ||||
|         # XXX for debug via the `@.setter` | ||||
|         self.cancel_called = True | ||||
|         self._cancel_called = True | ||||
|         # ^ XXX for debug via the `@.setter` | ||||
|         # self.cancel_called = True | ||||
| 
 | ||||
|         header: str = ( | ||||
|             f'Cancelling ctx from {side!r}-side\n' | ||||
|  | @ -2011,6 +2121,9 @@ async def open_context_from_portal( | |||
|             f'|_{portal.actor}\n' | ||||
|         ) | ||||
| 
 | ||||
|     # ?TODO? could we move this to inside the `tn` block? | ||||
|     # -> would allow doing `ctx.parent_task = tn.parent_task` ? | ||||
|     # -> would allow a `if not ._scope: => raise RTE` ? | ||||
|     ctx: Context = await portal.actor.start_remote_task( | ||||
|         portal.channel, | ||||
|         nsf=nsf, | ||||
|  | @ -2037,6 +2150,7 @@ async def open_context_from_portal( | |||
|     scope_err: BaseException|None = None | ||||
|     ctxc_from_child: ContextCancelled|None = None | ||||
|     try: | ||||
|         # from .devx import pause | ||||
|         async with ( | ||||
|             collapse_eg(), | ||||
|             trio.open_nursery() as tn, | ||||
|  | @ -2059,6 +2173,10 @@ async def open_context_from_portal( | |||
|             # the dialog, the `Error` msg should be raised from the `msg` | ||||
|             # handling block below. | ||||
|             try: | ||||
|                 log.runtime( | ||||
|                     f'IPC ctx parent waiting on Started msg..\n' | ||||
|                     f'ctx.cid: {ctx.cid!r}\n' | ||||
|                 ) | ||||
|                 started_msg, first = await ctx._pld_rx.recv_msg( | ||||
|                     ipc=ctx, | ||||
|                     expect_msg=Started, | ||||
|  | @ -2067,16 +2185,16 @@ async def open_context_from_portal( | |||
|                 ) | ||||
|             except trio.Cancelled as taskc: | ||||
|                 ctx_cs: trio.CancelScope = ctx._scope | ||||
|                 log.cancel( | ||||
|                     f'IPC ctx was cancelled during "child" task sync due to\n\n' | ||||
|                     f'.cid: {ctx.cid!r}\n' | ||||
|                     f'.maybe_error: {ctx.maybe_error!r}\n' | ||||
|                 ) | ||||
|                 # await pause(shield=True) | ||||
| 
 | ||||
|                 if not ctx_cs.cancel_called: | ||||
|                     raise | ||||
| 
 | ||||
|                 # from .devx import pause | ||||
|                 # await pause(shield=True) | ||||
| 
 | ||||
|                 log.cancel( | ||||
|                     'IPC ctx was cancelled during "child" task sync due to\n\n' | ||||
|                     f'{ctx.maybe_error}\n' | ||||
|                 ) | ||||
|                 # OW if the ctx's scope was cancelled manually, | ||||
|                 # likely the `Context` was cancelled via a call to | ||||
|                 # `._maybe_cancel_and_set_remote_error()` so ensure | ||||
|  | @ -2272,13 +2390,16 @@ async def open_context_from_portal( | |||
|         match scope_err: | ||||
|             case trio.Cancelled(): | ||||
|                 logmeth = log.cancel | ||||
|                 cause: str = 'cancelled' | ||||
| 
 | ||||
|             # XXX explicitly report on any non-graceful-taskc cases | ||||
|             case _: | ||||
|                 cause: str = 'errored' | ||||
|                 logmeth = log.exception | ||||
| 
 | ||||
|         logmeth( | ||||
|             f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' | ||||
|             f'ctx {ctx.side!r}-side {cause!r} with,\n' | ||||
|             f'{ctx.repr_outcome()!r}\n' | ||||
|         ) | ||||
| 
 | ||||
|         if debug_mode(): | ||||
|  | @ -2303,6 +2424,7 @@ async def open_context_from_portal( | |||
|         # told us it's cancelled ;p | ||||
|         if ctxc_from_child is None: | ||||
|             try: | ||||
|                 # await pause(shield=True) | ||||
|                 await ctx.cancel() | ||||
|             except ( | ||||
|                 trio.BrokenResourceError, | ||||
|  | @ -2459,8 +2581,10 @@ async def open_context_from_portal( | |||
|                 log.cancel( | ||||
|                     f'Context cancelled by local {ctx.side!r}-side task\n' | ||||
|                     f'c)>\n' | ||||
|                     f' |_{ctx._task}\n\n' | ||||
|                     f'{repr(scope_err)}\n' | ||||
|                     f'  |_{ctx.parent_task}\n' | ||||
|                     f'   .cid={ctx.cid!r}\n' | ||||
|                     f'\n' | ||||
|                     f'{scope_err!r}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # TODO: should we add a `._cancel_req_received` | ||||
|  |  | |||
|  | @ -654,8 +654,7 @@ async def _invoke( | |||
|                 # scope ensures unasking of the `await coro` below | ||||
|                 # *should* never be interfered with!! | ||||
|                 maybe_raise_from_masking_exc( | ||||
|                     tn=tn, | ||||
|                     unmask_from=Cancelled, | ||||
|                     unmask_from=(Cancelled,), | ||||
|                 ) as _mbme,  # maybe boxed masked exc | ||||
|             ): | ||||
|                 ctx._scope_nursery = tn | ||||
|  |  | |||
|  | @ -446,12 +446,12 @@ class ActorNursery: | |||
| @acm | ||||
| async def _open_and_supervise_one_cancels_all_nursery( | ||||
|     actor: Actor, | ||||
|     tb_hide: bool = False, | ||||
|     hide_tb: bool = True, | ||||
| 
 | ||||
| ) -> typing.AsyncGenerator[ActorNursery, None]: | ||||
| 
 | ||||
|     # normally don't need to show user by default | ||||
|     __tracebackhide__: bool = tb_hide | ||||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     outer_err: BaseException|None = None | ||||
|     inner_err: BaseException|None = None | ||||
|  |  | |||
|  | @ -0,0 +1,26 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2024-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| High level design patterns, APIs and runtime extensions built on top | ||||
| of the `tractor` runtime core. | ||||
| 
 | ||||
| ''' | ||||
| from ._service import ( | ||||
|     open_service_mngr as open_service_mngr, | ||||
|     get_service_mngr as get_service_mngr, | ||||
|     ServiceMngr as ServiceMngr, | ||||
| ) | ||||
|  | @ -0,0 +1,592 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2024-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| Daemon subactor as service(s) management and supervision primitives | ||||
| and API. | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
|     # contextmanager as cm, | ||||
| ) | ||||
| from collections import defaultdict | ||||
| from dataclasses import ( | ||||
|     dataclass, | ||||
|     field, | ||||
| ) | ||||
| import functools | ||||
| import inspect | ||||
| from typing import ( | ||||
|     Callable, | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| from trio import TaskStatus | ||||
| from tractor import ( | ||||
|     log, | ||||
|     ActorNursery, | ||||
|     current_actor, | ||||
|     ContextCancelled, | ||||
|     Context, | ||||
|     Portal, | ||||
| ) | ||||
| 
 | ||||
| log = log.get_logger('tractor') | ||||
| 
 | ||||
| 
 | ||||
| # TODO: implement a `@singleton` deco-API for wrapping the below | ||||
| # factory's impl for general actor-singleton use? | ||||
| # | ||||
| # -[ ] go through the options peeps on SO did? | ||||
| #  * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python | ||||
| #  * including @mikenerone's answer | ||||
| #   |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 | ||||
| # | ||||
| # -[ ] put it in `tractor.lowlevel._globals` ? | ||||
| #  * fits with our oustanding actor-local/global feat req? | ||||
| #   |_ https://github.com/goodboy/tractor/issues/55 | ||||
| #  * how can it relate to the `Actor.lifetime_stack` that was | ||||
| #    silently patched in? | ||||
| #   |_ we could implicitly call both of these in the same | ||||
| #     spot in the runtime using the lifetime stack? | ||||
| #    - `open_singleton_cm().__exit__()` | ||||
| #    -`del_singleton()` | ||||
| #   |_ gives SC fixtue semantics to sync code oriented around | ||||
| #     sub-process lifetime? | ||||
| #  * what about with `trio.RunVar`? | ||||
| #   |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar | ||||
| #    - which we'll need for no-GIL cpython (right?) presuming | ||||
| #      multiple `trio.run()` calls in process? | ||||
| # | ||||
| # | ||||
| # @singleton | ||||
| # async def open_service_mngr( | ||||
| #     **init_kwargs, | ||||
| # ) -> ServiceMngr: | ||||
| #     ''' | ||||
| #     Note this function body is invoke IFF no existing singleton instance already | ||||
| #     exists in this proc's memory. | ||||
| 
 | ||||
| #     ''' | ||||
| #     # setup | ||||
| #     yield ServiceMngr(**init_kwargs) | ||||
| #     # teardown | ||||
| 
 | ||||
| 
 | ||||
| # a deletion API for explicit instance de-allocation? | ||||
| # @open_service_mngr.deleter | ||||
| # def del_service_mngr() -> None: | ||||
| #     mngr = open_service_mngr._singleton[0] | ||||
| #     open_service_mngr._singleton[0] = None | ||||
| #     del mngr | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: implement a singleton deco-API for wrapping the below | ||||
| # factory's impl for general actor-singleton use? | ||||
| # | ||||
| # @singleton | ||||
| # async def open_service_mngr( | ||||
| #     **init_kwargs, | ||||
| # ) -> ServiceMngr: | ||||
| #     ''' | ||||
| #     Note this function body is invoke IFF no existing singleton instance already | ||||
| #     exists in this proc's memory. | ||||
| 
 | ||||
| #     ''' | ||||
| #     # setup | ||||
| #     yield ServiceMngr(**init_kwargs) | ||||
| #     # teardown | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: singleton factory API instead of a class API | ||||
| @acm | ||||
| async def open_service_mngr( | ||||
|     *, | ||||
|     debug_mode: bool = False, | ||||
| 
 | ||||
|     # NOTE; since default values for keyword-args are effectively | ||||
|     # module-vars/globals as per the note from, | ||||
|     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values | ||||
|     # | ||||
|     # > "The default value is evaluated only once. This makes | ||||
|     #   a difference when the default is a mutable object such as | ||||
|     #   a list, dictionary, or instances of most classes" | ||||
|     # | ||||
|     _singleton: list[ServiceMngr|None] = [None], | ||||
|     **init_kwargs, | ||||
| 
 | ||||
| ) -> ServiceMngr: | ||||
|     ''' | ||||
|     Open an actor-global "service-manager" for supervising a tree | ||||
|     of subactors and/or actor-global tasks. | ||||
| 
 | ||||
|     The delivered `ServiceMngr` is singleton instance for each | ||||
|     actor-process, that is, allocated on first open and never | ||||
|     de-allocated unless explicitly deleted by al call to | ||||
|     `del_service_mngr()`. | ||||
| 
 | ||||
|     ''' | ||||
|     # TODO: factor this an allocation into | ||||
|     # a `._mngr.open_service_mngr()` and put in the | ||||
|     # once-n-only-once setup/`.__aenter__()` part! | ||||
|     # -[ ] how to make this only happen on the `mngr == None` case? | ||||
|     #  |_ use `.trionics.maybe_open_context()` (for generic | ||||
|     #     async-with-style-only-once of the factory impl, though | ||||
|     #     what do we do for the allocation case? | ||||
|     #    / `.maybe_open_nursery()` (since for this specific case | ||||
|     #    it's simpler?) to activate | ||||
|     async with ( | ||||
|         tractor.open_nursery() as an, | ||||
|         trio.open_nursery() as tn, | ||||
|     ): | ||||
|         # impl specific obvi.. | ||||
|         init_kwargs.update({ | ||||
|             'an': an, | ||||
|             'tn': tn, | ||||
|         }) | ||||
| 
 | ||||
|         mngr: ServiceMngr|None | ||||
|         if (mngr := _singleton[0]) is None: | ||||
| 
 | ||||
|             log.info('Allocating a new service mngr!') | ||||
|             mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||
| 
 | ||||
|             # TODO: put into `.__aenter__()` section of | ||||
|             # eventual `@singleton_acm` API wrapper. | ||||
|             # | ||||
|             # assign globally for future daemon/task creation | ||||
|             mngr.an = an | ||||
|             mngr.tn = tn | ||||
| 
 | ||||
|         else: | ||||
|             assert (mngr.an and mngr.tn) | ||||
|             log.info( | ||||
|                 'Using extant service mngr!\n\n' | ||||
|                 f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||
|             ) | ||||
| 
 | ||||
|         try: | ||||
|             # NOTE: this is a singleton factory impl specific detail | ||||
|             # which should be supported in the condensed | ||||
|             # `@singleton_acm` API? | ||||
|             mngr.debug_mode = debug_mode | ||||
| 
 | ||||
|             yield mngr | ||||
|         finally: | ||||
|             # TODO: is this more clever/efficient? | ||||
|             # if 'samplerd' in mngr.service_ctxs: | ||||
|             #     await mngr.cancel_service('samplerd') | ||||
|             tn.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def get_service_mngr() -> ServiceMngr: | ||||
|     ''' | ||||
|     Try to get the singleton service-mngr for this actor presuming it | ||||
|     has already been allocated using, | ||||
| 
 | ||||
|     .. code:: python | ||||
| 
 | ||||
|         async with open_<@singleton_acm(func)>() as mngr` | ||||
|             ... this block kept open ... | ||||
| 
 | ||||
|     If not yet allocated raise a `ServiceError`. | ||||
| 
 | ||||
|     ''' | ||||
|     # https://stackoverflow.com/a/12627202 | ||||
|     # https://docs.python.org/3/library/inspect.html#inspect.Signature | ||||
|     maybe_mngr: ServiceMngr|None = inspect.signature( | ||||
|         open_service_mngr | ||||
|     ).parameters['_singleton'].default[0] | ||||
| 
 | ||||
|     if maybe_mngr is None: | ||||
|         raise RuntimeError( | ||||
|             'Someone must allocate a `ServiceMngr` using\n\n' | ||||
|             '`async with open_service_mngr()` beforehand!!\n' | ||||
|         ) | ||||
| 
 | ||||
|     return maybe_mngr | ||||
| 
 | ||||
| 
 | ||||
| async def _open_and_supervise_service_ctx( | ||||
|     serman: ServiceMngr, | ||||
|     name: str, | ||||
|     ctx_fn: Callable,  # TODO, type for `@tractor.context` requirement | ||||
|     portal: Portal, | ||||
| 
 | ||||
|     allow_overruns: bool = False, | ||||
|     task_status: TaskStatus[ | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             Context, | ||||
|             trio.Event, | ||||
|             Any, | ||||
|         ] | ||||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
|     **ctx_kwargs, | ||||
| 
 | ||||
| ) -> Any: | ||||
|     ''' | ||||
|     Open a remote IPC-context defined by `ctx_fn` in the | ||||
|     (service) actor accessed via `portal` and supervise the | ||||
|     (local) parent task to termination at which point the remote | ||||
|     actor runtime is cancelled alongside it. | ||||
| 
 | ||||
|     The main application is for allocating long-running | ||||
|     "sub-services" in a main daemon and explicitly controlling | ||||
|     their lifetimes from an actor-global singleton. | ||||
| 
 | ||||
|     ''' | ||||
|     # TODO: use the ctx._scope directly here instead? | ||||
|     # -[ ] actually what semantics do we expect for this | ||||
|     #   usage!? | ||||
|     with trio.CancelScope() as cs: | ||||
|         try: | ||||
|             async with portal.open_context( | ||||
|                 ctx_fn, | ||||
|                 allow_overruns=allow_overruns, | ||||
|                 **ctx_kwargs, | ||||
| 
 | ||||
|             ) as (ctx, started): | ||||
| 
 | ||||
|                 # unblock once the remote context has started | ||||
|                 complete = trio.Event() | ||||
|                 task_status.started(( | ||||
|                     cs, | ||||
|                     ctx, | ||||
|                     complete, | ||||
|                     started, | ||||
|                 )) | ||||
|                 log.info( | ||||
|                     f'`pikerd` service {name} started with value {started}' | ||||
|                 ) | ||||
|                 # wait on any context's return value | ||||
|                 # and any final portal result from the | ||||
|                 # sub-actor. | ||||
|                 ctx_res: Any = await ctx.wait_for_result() | ||||
| 
 | ||||
|                 # NOTE: blocks indefinitely until cancelled | ||||
|                 # either by error from the target context | ||||
|                 # function or by being cancelled here by the | ||||
|                 # surrounding cancel scope. | ||||
|                 return ( | ||||
|                     await portal.wait_for_result(), | ||||
|                     ctx_res, | ||||
|                 ) | ||||
| 
 | ||||
|         except ContextCancelled as ctxe: | ||||
|             canceller: tuple[str, str] = ctxe.canceller | ||||
|             our_uid: tuple[str, str] = current_actor().uid | ||||
|             if ( | ||||
|                 canceller != portal.chan.uid | ||||
|                 and | ||||
|                 canceller != our_uid | ||||
|             ): | ||||
|                 log.cancel( | ||||
|                     f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||
| 
 | ||||
|                     # TODO: this would be a good spot to use | ||||
|                     # a respawn feature Bo | ||||
|                     f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||
| 
 | ||||
|                     f'cancellee: {portal.chan.uid}\n' | ||||
|                     f'canceller: {canceller}\n' | ||||
|                 ) | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         finally: | ||||
|             # NOTE: the ctx MUST be cancelled first if we | ||||
|             # don't want the above `ctx.wait_for_result()` to | ||||
|             # raise a self-ctxc. WHY, well since from the ctx's | ||||
|             # perspective the cancel request will have | ||||
|             # arrived out-out-of-band at the `Actor.cancel()` | ||||
|             # level, thus `Context.cancel_called == False`, | ||||
|             # meaning `ctx._is_self_cancelled() == False`. | ||||
|             # with trio.CancelScope(shield=True): | ||||
|             # await ctx.cancel() | ||||
|             await portal.cancel_actor()  # terminate (remote) sub-actor | ||||
|             complete.set()  # signal caller this task is done | ||||
|             serman.service_ctxs.pop(name)  # remove mngr entry | ||||
| 
 | ||||
| 
 | ||||
| # TODO: we need remote wrapping and a general soln: | ||||
| # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||
| #   library. | ||||
| # - wrap a "remote api" wherein you can get a method proxy | ||||
| #   to the pikerd actor for starting services remotely! | ||||
| # - prolly rename this to ActorServicesNursery since it spawns | ||||
| #   new actors and supervises them to completion? | ||||
| @dataclass | ||||
| class ServiceMngr: | ||||
|     ''' | ||||
|     A multi-subactor-as-service manager. | ||||
| 
 | ||||
|     Spawn, supervise and monitor service/daemon subactors in a SC | ||||
|     process tree. | ||||
| 
 | ||||
|     ''' | ||||
|     an: ActorNursery | ||||
|     tn: trio.Nursery | ||||
|     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||
| 
 | ||||
|     service_tasks: dict[ | ||||
|         str, | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             trio.Event, | ||||
|         ] | ||||
|     ] = field(default_factory=dict) | ||||
| 
 | ||||
|     service_ctxs: dict[ | ||||
|         str, | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             Context, | ||||
|             Portal, | ||||
|             trio.Event, | ||||
|         ] | ||||
|     ] = field(default_factory=dict) | ||||
| 
 | ||||
|     # internal per-service task mutexs | ||||
|     _locks = defaultdict(trio.Lock) | ||||
| 
 | ||||
|     # TODO, unify this interface with our `TaskManager` PR! | ||||
|     # | ||||
|     # | ||||
|     async def start_service_task( | ||||
|         self, | ||||
|         name: str, | ||||
|         # TODO: typevar for the return type of the target and then | ||||
|         # use it below for `ctx_res`? | ||||
|         fn: Callable, | ||||
| 
 | ||||
|         allow_overruns: bool = False, | ||||
|         **ctx_kwargs, | ||||
| 
 | ||||
|     ) -> tuple[ | ||||
|         trio.CancelScope, | ||||
|         Any, | ||||
|         trio.Event, | ||||
|     ]: | ||||
|         async def _task_manager_start( | ||||
|             task_status: TaskStatus[ | ||||
|                 tuple[ | ||||
|                     trio.CancelScope, | ||||
|                     trio.Event, | ||||
|                 ] | ||||
|             ] = trio.TASK_STATUS_IGNORED, | ||||
|         ) -> Any: | ||||
| 
 | ||||
|             task_cs = trio.CancelScope() | ||||
|             task_complete = trio.Event() | ||||
| 
 | ||||
|             with task_cs as cs: | ||||
|                 task_status.started(( | ||||
|                     cs, | ||||
|                     task_complete, | ||||
|                 )) | ||||
|                 try: | ||||
|                     await fn() | ||||
|                 except trio.Cancelled as taskc: | ||||
|                     log.cancel( | ||||
|                         f'Service task for `{name}` was cancelled!\n' | ||||
|                         # TODO: this would be a good spot to use | ||||
|                         # a respawn feature Bo | ||||
|                     ) | ||||
|                     raise taskc | ||||
|                 finally: | ||||
|                     task_complete.set() | ||||
|         ( | ||||
|             cs, | ||||
|             complete, | ||||
|         ) = await self.tn.start(_task_manager_start) | ||||
| 
 | ||||
|         # store the cancel scope and portal for later cancellation or | ||||
|         # retstart if needed. | ||||
|         self.service_tasks[name] = ( | ||||
|             cs, | ||||
|             complete, | ||||
|         ) | ||||
|         return ( | ||||
|             cs, | ||||
|             complete, | ||||
|         ) | ||||
| 
 | ||||
|     async def cancel_service_task( | ||||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         log.info(f'Cancelling `pikerd` service {name}') | ||||
|         cs, complete = self.service_tasks[name] | ||||
| 
 | ||||
|         cs.cancel() | ||||
|         await complete.wait() | ||||
|         # TODO, if we use the `TaskMngr` from #346 | ||||
|         # we can also get the return value from the task! | ||||
| 
 | ||||
|         if name in self.service_tasks: | ||||
|             # TODO: custom err? | ||||
|             # raise ServiceError( | ||||
|             raise RuntimeError( | ||||
|                 f'Service task {name!r} not terminated!?\n' | ||||
|             ) | ||||
| 
 | ||||
|     async def start_service_ctx( | ||||
|         self, | ||||
|         name: str, | ||||
|         portal: Portal, | ||||
|         # TODO: typevar for the return type of the target and then | ||||
|         # use it below for `ctx_res`? | ||||
|         ctx_fn: Callable, | ||||
|         **ctx_kwargs, | ||||
| 
 | ||||
|     ) -> tuple[ | ||||
|         trio.CancelScope, | ||||
|         Context, | ||||
|         Any, | ||||
|     ]: | ||||
|         ''' | ||||
|         Start a remote IPC-context defined by `ctx_fn` in a background | ||||
|         task and immediately return supervision primitives to manage it: | ||||
| 
 | ||||
|         - a `cs: CancelScope` for the newly allocated bg task | ||||
|         - the `ipc_ctx: Context` to manage the remotely scheduled | ||||
|           `trio.Task`. | ||||
|         - the `started: Any` value returned by the remote endpoint | ||||
|           task's `Context.started(<value>)` call. | ||||
| 
 | ||||
|         The bg task supervises the ctx such that when it terminates the supporting | ||||
|         actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` | ||||
|         for details. | ||||
| 
 | ||||
|         ''' | ||||
|         cs, ipc_ctx, complete, started = await self.tn.start( | ||||
|             functools.partial( | ||||
|                 _open_and_supervise_service_ctx, | ||||
|                 serman=self, | ||||
|                 name=name, | ||||
|                 ctx_fn=ctx_fn, | ||||
|                 portal=portal, | ||||
|                 **ctx_kwargs, | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|         # store the cancel scope and portal for later cancellation or | ||||
|         # retstart if needed. | ||||
|         self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) | ||||
|         return ( | ||||
|             cs, | ||||
|             ipc_ctx, | ||||
|             started, | ||||
|         ) | ||||
| 
 | ||||
|     async def start_service( | ||||
|         self, | ||||
|         daemon_name: str, | ||||
|         ctx_ep: Callable,  # kwargs must `partial`-ed in! | ||||
|         # ^TODO, type for `@tractor.context` deco-ed funcs! | ||||
| 
 | ||||
|         debug_mode: bool = False, | ||||
|         **start_actor_kwargs, | ||||
| 
 | ||||
|     ) -> Context: | ||||
|         ''' | ||||
|         Start new subactor and schedule a supervising "service task" | ||||
|         in it which explicitly defines the sub's lifetime. | ||||
| 
 | ||||
|         "Service daemon subactors" are cancelled (and thus | ||||
|         terminated) using the paired `.cancel_service()`. | ||||
| 
 | ||||
|         Effectively this API can be used to manage "service daemons" | ||||
|         spawned under a single parent actor with supervision | ||||
|         semantics equivalent to a one-cancels-one style actor-nursery | ||||
|         or "(subactor) task manager" where each subprocess's (and | ||||
|         thus its embedded actor runtime) lifetime is synced to that | ||||
|         of the remotely spawned task defined by `ctx_ep`. | ||||
| 
 | ||||
|         The funcionality can be likened to a "daemonized" version of | ||||
|         `.hilevel.worker.run_in_actor()` but with supervision | ||||
|         controls offered by `tractor.Context` where the main/root | ||||
|         remotely scheduled `trio.Task` invoking `ctx_ep` determines | ||||
|         the underlying subactor's lifetime. | ||||
| 
 | ||||
|         ''' | ||||
|         entry: tuple|None = self.service_ctxs.get(daemon_name) | ||||
|         if entry: | ||||
|             (cs, sub_ctx, portal, complete) = entry | ||||
|             return sub_ctx | ||||
| 
 | ||||
|         if daemon_name not in self.service_ctxs: | ||||
|             portal: Portal = await self.an.start_actor( | ||||
|                 daemon_name, | ||||
|                 debug_mode=(  # maybe set globally during allocate | ||||
|                     debug_mode | ||||
|                     or | ||||
|                     self.debug_mode | ||||
|                 ), | ||||
|                 **start_actor_kwargs, | ||||
|             ) | ||||
|             ctx_kwargs: dict[str, Any] = {} | ||||
|             if isinstance(ctx_ep, functools.partial): | ||||
|                 ctx_kwargs: dict[str, Any] = ctx_ep.keywords | ||||
|                 ctx_ep: Callable = ctx_ep.func | ||||
| 
 | ||||
|             ( | ||||
|                 cs, | ||||
|                 sub_ctx, | ||||
|                 started, | ||||
|             ) = await self.start_service_ctx( | ||||
|                 name=daemon_name, | ||||
|                 portal=portal, | ||||
|                 ctx_fn=ctx_ep, | ||||
|                 **ctx_kwargs, | ||||
|             ) | ||||
| 
 | ||||
|             return sub_ctx | ||||
| 
 | ||||
|     async def cancel_service( | ||||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         ''' | ||||
|         Cancel the service task and actor for the given ``name``. | ||||
| 
 | ||||
|         ''' | ||||
|         log.info(f'Cancelling `pikerd` service {name}') | ||||
|         cs, sub_ctx, portal, complete = self.service_ctxs[name] | ||||
| 
 | ||||
|         # cs.cancel() | ||||
|         await sub_ctx.cancel() | ||||
|         await complete.wait() | ||||
| 
 | ||||
|         if name in self.service_ctxs: | ||||
|             # TODO: custom err? | ||||
|             # raise ServiceError( | ||||
|             raise RuntimeError( | ||||
|                 f'Service actor for {name} not terminated and/or unknown?' | ||||
|             ) | ||||
| 
 | ||||
|         # assert name not in self.service_ctxs, \ | ||||
|         #     f'Serice task for {name} not terminated?' | ||||
|  | @ -613,10 +613,9 @@ async def drain_to_final_msg( | |||
|             #       msg: dict = await ctx._rx_chan.receive() | ||||
|             #   if res_cs.cancelled_caught: | ||||
|             # | ||||
|             # -[ ] make sure pause points work here for REPLing | ||||
|             # -[x] make sure pause points work here for REPLing | ||||
|             #   the runtime itself; i.e. ensure there's no hangs! | ||||
|             # |_from tractor.devx.debug import pause | ||||
|             #   await pause() | ||||
|             #  |_see masked code below in .cancel_called path | ||||
| 
 | ||||
|         # NOTE: we get here if the far end was | ||||
|         # `ContextCancelled` in 2 cases: | ||||
|  | @ -652,6 +651,10 @@ async def drain_to_final_msg( | |||
|                     f'IPC ctx cancelled externally during result drain ?\n' | ||||
|                     f'{ctx}' | ||||
|                 ) | ||||
|                 # XXX, for tracing `Cancelled`.. | ||||
|                 # from tractor.devx.debug import pause | ||||
|                 # await pause(shield=True) | ||||
| 
 | ||||
|             # CASE 2: mask the local cancelled-error(s) | ||||
|             # only when we are sure the remote error is | ||||
|             # the source cause of this local task's | ||||
|  |  | |||
|  | @ -21,7 +21,6 @@ Sugary patterns for trio + tractor designs. | |||
| from ._mngrs import ( | ||||
|     gather_contexts as gather_contexts, | ||||
|     maybe_open_context as maybe_open_context, | ||||
|     maybe_open_nursery as maybe_open_nursery, | ||||
| ) | ||||
| from ._broadcast import ( | ||||
|     AsyncReceiver as AsyncReceiver, | ||||
|  | @ -37,3 +36,6 @@ from ._beg import ( | |||
| from ._taskc import ( | ||||
|     maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, | ||||
| ) | ||||
| from ._tn import ( | ||||
|     maybe_open_nursery as maybe_open_nursery, | ||||
| ) | ||||
|  |  | |||
|  | @ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html | |||
| from __future__ import annotations | ||||
| from abc import abstractmethod | ||||
| from collections import deque | ||||
| from contextlib import asynccontextmanager | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from functools import partial | ||||
| from operator import ne | ||||
| from typing import ( | ||||
|  | @ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel): | |||
| 
 | ||||
|             return await self._receive_from_underlying(key, state) | ||||
| 
 | ||||
|     @asynccontextmanager | ||||
|     @acm | ||||
|     async def subscribe( | ||||
|         self, | ||||
|         raise_on_lag: bool = True, | ||||
|  |  | |||
|  | @ -23,7 +23,6 @@ from contextlib import ( | |||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| import inspect | ||||
| from types import ModuleType | ||||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncContextManager, | ||||
|  | @ -31,24 +30,20 @@ from typing import ( | |||
|     AsyncIterator, | ||||
|     Callable, | ||||
|     Hashable, | ||||
|     Optional, | ||||
|     Sequence, | ||||
|     TypeVar, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from tractor._state import current_actor | ||||
| from tractor.log import get_logger | ||||
| from ._tn import maybe_open_nursery | ||||
| # from ._beg import collapse_eg | ||||
| # from ._taskc import ( | ||||
| #     maybe_raise_from_masking_exc, | ||||
| # ) | ||||
| 
 | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from tractor import ActorNursery | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -56,30 +51,6 @@ log = get_logger(__name__) | |||
| T = TypeVar("T") | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_nursery( | ||||
|     nursery: trio.Nursery|ActorNursery|None = None, | ||||
|     shield: bool = False, | ||||
|     lib: ModuleType = trio, | ||||
| 
 | ||||
|     **kwargs,  # proxy thru | ||||
| 
 | ||||
| ) -> AsyncGenerator[trio.Nursery, Any]: | ||||
|     ''' | ||||
|     Create a new nursery if None provided. | ||||
| 
 | ||||
|     Blocks on exit as expected if no input nursery is provided. | ||||
| 
 | ||||
|     ''' | ||||
|     if nursery is not None: | ||||
|         yield nursery | ||||
|     else: | ||||
|         async with lib.open_nursery(**kwargs) as nursery: | ||||
|             if lib == trio: | ||||
|                 nursery.cancel_scope.shield = shield | ||||
|             yield nursery | ||||
| 
 | ||||
| 
 | ||||
| async def _enter_and_wait( | ||||
|     mngr: AsyncContextManager[T], | ||||
|     unwrapped: dict[int, T], | ||||
|  | @ -204,7 +175,7 @@ class _Cache: | |||
|     a kept-alive-while-in-use async resource. | ||||
| 
 | ||||
|     ''' | ||||
|     service_tn: Optional[trio.Nursery] = None | ||||
|     service_tn: trio.Nursery|None = None | ||||
|     locks: dict[Hashable, trio.Lock] = {} | ||||
|     users: int = 0 | ||||
|     values: dict[Any,  Any] = {} | ||||
|  | @ -213,7 +184,7 @@ class _Cache: | |||
|         tuple[trio.Nursery, trio.Event] | ||||
|     ] = {} | ||||
|     # nurseries: dict[int, trio.Nursery] = {} | ||||
|     no_more_users: Optional[trio.Event] = None | ||||
|     no_more_users: trio.Event|None = None | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def run_ctx( | ||||
|  | @ -223,16 +194,18 @@ class _Cache: | |||
|         task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         async with mng as value: | ||||
|             _, no_more_users = cls.resources[ctx_key] | ||||
|             cls.values[ctx_key] = value | ||||
|             task_status.started(value) | ||||
|             try: | ||||
|                 await no_more_users.wait() | ||||
|             finally: | ||||
|                 # discard nursery ref so it won't be re-used (an error)? | ||||
|                 value = cls.values.pop(ctx_key) | ||||
|                 cls.resources.pop(ctx_key) | ||||
|         try: | ||||
|             async with mng as value: | ||||
|                 _, no_more_users = cls.resources[ctx_key] | ||||
|                 try: | ||||
|                     cls.values[ctx_key] = value | ||||
|                     task_status.started(value) | ||||
|                     await no_more_users.wait() | ||||
|                 finally: | ||||
|                     value = cls.values.pop(ctx_key) | ||||
|         finally: | ||||
|             # discard nursery ref so it won't be re-used (an error)? | ||||
|             cls.resources.pop(ctx_key) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  |  | |||
|  | @ -0,0 +1,341 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2018-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| Erlang-style (ish) "one-cancels-one" nursery, what we just call | ||||
| a "task manager". | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
|     # contextmanager as cm, | ||||
| ) | ||||
| from functools import partial | ||||
| from typing import ( | ||||
|     Generator, | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| from outcome import ( | ||||
|     Outcome, | ||||
|     acapture, | ||||
| ) | ||||
| from msgspec import Struct | ||||
| import trio | ||||
| from trio import ( | ||||
|     TaskStatus, | ||||
|     CancelScope, | ||||
|     Nursery, | ||||
| ) | ||||
| from trio.lowlevel import ( | ||||
|     Task, | ||||
| ) | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class TaskOutcome(Struct): | ||||
|     ''' | ||||
|     The outcome of a scheduled ``trio`` task which includes an interface | ||||
|     for synchronizing to the completion of the task's runtime and access | ||||
|     to the eventual boxed result/value or raised exception. | ||||
| 
 | ||||
|     ''' | ||||
|     lowlevel_task: Task | ||||
|     _exited = trio.Event()  # as per `trio.Runner.task_exited()` | ||||
|     _outcome: Outcome | None = None  # as per `outcome.Outcome` | ||||
|     _result: Any | None = None  # the eventual maybe-returned-value | ||||
| 
 | ||||
|     @property | ||||
|     def result(self) -> Any: | ||||
|         ''' | ||||
|         Either Any or None depending on whether the Outcome has compeleted. | ||||
| 
 | ||||
|         ''' | ||||
|         if self._outcome is None: | ||||
|             raise RuntimeError( | ||||
|                 f'Task {self.lowlevel_task.name} is not complete.\n' | ||||
|                 'First wait on `await TaskOutcome.wait_for_result()`!' | ||||
|             ) | ||||
|         return self._result | ||||
| 
 | ||||
|     def _set_outcome( | ||||
|         self, | ||||
|         outcome: Outcome, | ||||
|     ): | ||||
|         ''' | ||||
|         Set the ``Outcome`` for this task. | ||||
| 
 | ||||
|         This method should only ever be called by the task's supervising | ||||
|         nursery implemenation. | ||||
| 
 | ||||
|         ''' | ||||
|         self._outcome = outcome | ||||
|         self._result = outcome.unwrap() | ||||
|         self._exited.set() | ||||
| 
 | ||||
|     async def wait_for_result(self) -> Any: | ||||
|         ''' | ||||
|         Unwind the underlying task's ``Outcome`` by async waiting for | ||||
|         the task to first complete and then unwrap it's result-value. | ||||
| 
 | ||||
|         ''' | ||||
|         if self._exited.is_set(): | ||||
|             return self._result | ||||
| 
 | ||||
|         await self._exited.wait() | ||||
| 
 | ||||
|         out = self._outcome | ||||
|         if out is None: | ||||
|             raise ValueError(f'{out} is not an outcome!?') | ||||
| 
 | ||||
|         return self.result | ||||
| 
 | ||||
| 
 | ||||
| class TaskManagerNursery(Struct): | ||||
|     _tn: Nursery | ||||
|     _scopes: dict[ | ||||
|         Task, | ||||
|         tuple[CancelScope, Outcome] | ||||
|     ] = {} | ||||
| 
 | ||||
|     task_manager: Generator[Any, Outcome, None] | None = None | ||||
| 
 | ||||
|     async def start_soon( | ||||
|         self, | ||||
|         async_fn, | ||||
|         *args, | ||||
| 
 | ||||
|         name=None, | ||||
|         task_manager: Generator[Any, Outcome, None] | None = None | ||||
| 
 | ||||
|     ) -> tuple[CancelScope, Task]: | ||||
| 
 | ||||
|         # NOTE: internals of a nursery don't let you know what | ||||
|         # the most recently spawned task is by order.. so we'd | ||||
|         # have to either change that or do set ops. | ||||
|         # pre_start_tasks: set[Task] = n._children.copy() | ||||
|         # new_tasks = n._children - pre_start_Tasks | ||||
|         # assert len(new_tasks) == 1 | ||||
|         # task = new_tasks.pop() | ||||
| 
 | ||||
|         tn: Nursery = self._tn | ||||
| 
 | ||||
|         sm = self.task_manager | ||||
|         # we do default behavior of a scope-per-nursery | ||||
|         # if the user did not provide a task manager. | ||||
|         if sm is None: | ||||
|             return tn.start_soon(async_fn, *args, name=None) | ||||
| 
 | ||||
|         # new_task: Task|None = None | ||||
|         to_return: tuple[Any] | None = None | ||||
| 
 | ||||
|         # NOTE: what do we enforce as a signature for the | ||||
|         # `@task_scope_manager` here? | ||||
|         mngr = sm(nursery=tn) | ||||
| 
 | ||||
|         async def _start_wrapped_in_scope( | ||||
|             task_status: TaskStatus[ | ||||
|                 tuple[CancelScope, Task] | ||||
|             ] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
|         ) -> None: | ||||
| 
 | ||||
|             # TODO: this was working before?! and, do we need something | ||||
|             # like it to implement `.start()`? | ||||
|             # nonlocal to_return | ||||
| 
 | ||||
|             # execute up to the first yield | ||||
|             try: | ||||
|                 to_return: tuple[Any] = next(mngr) | ||||
|             except StopIteration: | ||||
|                 raise RuntimeError("task manager didn't yield") from None | ||||
| 
 | ||||
|             # TODO: how do we support `.start()` style? | ||||
|             # - relay through whatever the | ||||
|             #   started task passes back via `.started()` ? | ||||
|             #   seems like that won't work with also returning | ||||
|             #   a "task handle"? | ||||
|             # - we were previously binding-out this `to_return` to | ||||
|             #   the parent's lexical scope, why isn't that working | ||||
|             #   now? | ||||
|             task_status.started(to_return) | ||||
| 
 | ||||
|             # invoke underlying func now that cs is entered. | ||||
|             outcome = await acapture(async_fn, *args) | ||||
| 
 | ||||
|             # execute from the 1st yield to return and expect | ||||
|             # generator-mngr `@task_scope_manager` thinger to | ||||
|             # terminate! | ||||
|             try: | ||||
|                 mngr.send(outcome) | ||||
| 
 | ||||
| 
 | ||||
|                 # I would presume it's better to have a handle to | ||||
|                 # the `Outcome` entirely? This method sends *into* | ||||
|                 # the mngr this `Outcome.value`; seems like kinda | ||||
|                 # weird semantics for our purposes? | ||||
|                 # outcome.send(mngr) | ||||
| 
 | ||||
|             except StopIteration: | ||||
|                 return | ||||
|             else: | ||||
|                 raise RuntimeError(f"{mngr} didn't stop!") | ||||
| 
 | ||||
|         to_return = await tn.start(_start_wrapped_in_scope) | ||||
|         assert to_return is not None | ||||
| 
 | ||||
|         # TODO: use the fancy type-check-time type signature stuff from | ||||
|         # mypy i guess..to like, relay the type of whatever the | ||||
|         # generator yielded through? betcha that'll be un-grokable XD | ||||
|         return to_return | ||||
| 
 | ||||
| 
 | ||||
| # TODO: define a decorator to runtime type check that this a generator | ||||
| # with a single yield that also delivers a value (of some std type) from | ||||
| # the yield expression? | ||||
| # @trio.task_manager | ||||
| def add_task_handle_and_crash_handling( | ||||
|     nursery: Nursery, | ||||
| 
 | ||||
|     debug_mode: bool = False, | ||||
| 
 | ||||
| ) -> Generator[ | ||||
|     Any, | ||||
|     Outcome, | ||||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|     A customizable, user defined "task scope manager". | ||||
| 
 | ||||
|     With this specially crafted single-yield generator function you can | ||||
|     add more granular controls around every task spawned by `trio` B) | ||||
| 
 | ||||
|     ''' | ||||
|     # if you need it you can ask trio for the task obj | ||||
|     task: Task = trio.lowlevel.current_task() | ||||
|     log.info(f'Spawning task: {task.name}') | ||||
| 
 | ||||
|     # User defined "task handle" for more granular supervision | ||||
|     # of each spawned task as needed for their particular usage. | ||||
|     task_outcome = TaskOutcome(task) | ||||
| 
 | ||||
|     # NOTE: if wanted the user could wrap the output task handle however | ||||
|     # they want! | ||||
|     # class TaskHandle(Struct): | ||||
|     #     task: Task | ||||
|     #     cs: CancelScope | ||||
|     #     outcome: TaskOutcome | ||||
| 
 | ||||
|     # this yields back when the task is terminated, cancelled or returns. | ||||
|     try: | ||||
|         with CancelScope() as cs: | ||||
| 
 | ||||
|             # the yielded value(s) here are what are returned to the | ||||
|             # nursery's `.start_soon()` caller B) | ||||
|             lowlevel_outcome: Outcome = yield (task_outcome, cs) | ||||
|             task_outcome._set_outcome(lowlevel_outcome) | ||||
| 
 | ||||
|     # Adds "crash handling" from `pdbp` by entering | ||||
|     # a REPL on std errors. | ||||
|     except Exception as err: | ||||
|         if debug_mode: | ||||
|             log.exception( | ||||
|                 f'{task.name} crashed, entering debugger!' | ||||
|             ) | ||||
|             import pdbp | ||||
|             pdbp.xpm() | ||||
| 
 | ||||
|         raise err | ||||
| 
 | ||||
|     finally: | ||||
|         log.info( | ||||
|             f'Task exitted\n' | ||||
|             f')>\n' | ||||
|             f' |_{task}\n' | ||||
|             # ^^TODO? use sclang formatter? | ||||
|             # -[ ] .devx.pformat.nest_from_op()` yo! | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_taskman( | ||||
|     task_manager: Generator[Any, Outcome, None] | None = None, | ||||
| 
 | ||||
|     **lowlevel_nursery_kwargs, | ||||
| ): | ||||
|     async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse: | ||||
|         yield TaskManagerNursery( | ||||
|             nurse, | ||||
|             task_manager=task_manager, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| async def sleep_then_return_val(val: str): | ||||
|     await trio.sleep(0.2) | ||||
|     return val | ||||
| 
 | ||||
| 
 | ||||
| async def ensure_cancelled(): | ||||
|     try: | ||||
|         await trio.sleep_forever() | ||||
| 
 | ||||
|     except trio.Cancelled: | ||||
|         task = trio.lowlevel.current_task() | ||||
|         log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)') | ||||
|         assert 0 | ||||
| 
 | ||||
|     except BaseException: | ||||
|         raise RuntimeError("woa woa woa this ain't right!") | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
| 
 | ||||
|     from tractor.log import get_console_log | ||||
|     get_console_log(level='info') | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with open_taskman( | ||||
|             task_manager=partial( | ||||
|                 add_task_handle_and_crash_handling, | ||||
|                 debug_mode=True, | ||||
|             ), | ||||
|         ) as tm: | ||||
|             for _ in range(3): | ||||
|                 outcome, _ = await tm.start_soon(trio.sleep_forever) | ||||
| 
 | ||||
|             # extra task we want to engage in debugger post mortem. | ||||
|             err_outcome, cs = await tm.start_soon(ensure_cancelled) | ||||
| 
 | ||||
|             val: str = 'yoyoyo' | ||||
|             val_outcome, _ = await tm.start_soon( | ||||
|                 sleep_then_return_val, | ||||
|                 val, | ||||
|             ) | ||||
|             res = await val_outcome.wait_for_result() | ||||
|             assert res == val | ||||
|             log.info(f'{res} -> GOT EXPECTED TASK VALUE') | ||||
| 
 | ||||
|             await trio.sleep(0.6) | ||||
|             log.cancel( | ||||
|                 f'Cancelling and waiting on {err_outcome.lowlevel_task} ' | ||||
|                 'to CRASH..' | ||||
|             ) | ||||
|             cs.cancel() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  | @ -22,7 +22,14 @@ from __future__ import annotations | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from typing import TYPE_CHECKING | ||||
| import inspect | ||||
| from types import ( | ||||
|     TracebackType, | ||||
| ) | ||||
| from typing import ( | ||||
|     Type, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from tractor.log import get_logger | ||||
|  | @ -60,12 +67,71 @@ def find_masked_excs( | |||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| _mask_cases: dict[ | ||||
|     Type[Exception],  # masked exc type | ||||
|     dict[ | ||||
|         int,  # inner-frame index into `inspect.getinnerframes()` | ||||
|         # `FrameInfo.function/filename: str`s to match | ||||
|         dict[str, str], | ||||
|     ], | ||||
| ] = { | ||||
|     trio.WouldBlock: { | ||||
|         # `trio.Lock.acquire()` has a checkpoint inside the | ||||
|         # `WouldBlock`-no_wait path's handler.. | ||||
|         -5: {  # "5th frame up" from checkpoint | ||||
|             'filename': 'trio/_sync.py', | ||||
|             'function': 'acquire', | ||||
|             # 'lineno': 605,  # matters? | ||||
|         }, | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def is_expected_masking_case( | ||||
|     cases: dict, | ||||
|     exc_ctx: Exception, | ||||
|     exc_match: BaseException, | ||||
| 
 | ||||
| ) -> bool|inspect.FrameInfo: | ||||
|     ''' | ||||
|     Determine whether the provided masked exception is from a known | ||||
|     bug/special/unintentional-`trio`-impl case which we do not wish | ||||
|     to unmask. | ||||
| 
 | ||||
|     Return any guilty `inspect.FrameInfo` ow `False`. | ||||
| 
 | ||||
|     ''' | ||||
|     exc_tb: TracebackType = exc_match.__traceback__ | ||||
|     if cases := _mask_cases.get(type(exc_ctx)): | ||||
|         inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb) | ||||
| 
 | ||||
|         # from tractor.devx.debug import mk_pdb | ||||
|         # mk_pdb().set_trace() | ||||
|         for iframe, matchon in cases.items(): | ||||
|             try: | ||||
|                 masker_frame: inspect.FrameInfo = inner[iframe] | ||||
|             except IndexError: | ||||
|                 continue | ||||
| 
 | ||||
|             for field, in_field in matchon.items(): | ||||
|                 val = getattr( | ||||
|                     masker_frame, | ||||
|                     field, | ||||
|                 ) | ||||
|                 if in_field not in val: | ||||
|                     break | ||||
|             else: | ||||
|                 return masker_frame | ||||
| 
 | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # XXX, relevant discussion @ `trio`-core, | ||||
| # https://github.com/python-trio/trio/issues/455 | ||||
| # | ||||
| @acm | ||||
| async def maybe_raise_from_masking_exc( | ||||
|     tn: trio.Nursery|None = None, | ||||
|     unmask_from: ( | ||||
|         BaseException| | ||||
|         tuple[BaseException] | ||||
|  | @ -74,18 +140,30 @@ async def maybe_raise_from_masking_exc( | |||
|     raise_unmasked: bool = True, | ||||
|     extra_note: str = ( | ||||
|         'This can occurr when,\n' | ||||
|         ' - a `trio.Nursery` scope embeds a `finally:`-block ' | ||||
|         'which executes a checkpoint!' | ||||
|         '\n' | ||||
|         ' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block ' | ||||
|         'which execs an un-shielded checkpoint!' | ||||
|         # | ||||
|         # ^TODO? other cases? | ||||
|     ), | ||||
| 
 | ||||
|     always_warn_on: tuple[BaseException] = ( | ||||
|     always_warn_on: tuple[Type[BaseException]] = ( | ||||
|         trio.Cancelled, | ||||
|     ), | ||||
| 
 | ||||
|     # don't ever unmask or warn on any masking pair, | ||||
|     # {<masked-excT-key> -> <masking-excT-value>} | ||||
|     never_warn_on: dict[ | ||||
|         Type[BaseException], | ||||
|         Type[BaseException], | ||||
|     ] = { | ||||
|         KeyboardInterrupt: trio.Cancelled, | ||||
|         trio.Cancelled: trio.Cancelled, | ||||
|     }, | ||||
|     # ^XXX, special case(s) where we warn-log bc likely | ||||
|     # there will be no operational diff since the exc | ||||
|     # is always expected to be consumed. | ||||
| 
 | ||||
| ) -> BoxedMaybeException: | ||||
|     ''' | ||||
|     Maybe un-mask and re-raise exception(s) suppressed by a known | ||||
|  | @ -104,81 +182,112 @@ async def maybe_raise_from_masking_exc( | |||
|         individual sub-excs but maintain the eg-parent's form right? | ||||
| 
 | ||||
|     ''' | ||||
|     if not isinstance(unmask_from, tuple): | ||||
|         raise ValueError( | ||||
|             f'Invalid unmask_from = {unmask_from!r}\n' | ||||
|             f'Must be a `tuple[Type[BaseException]]`.\n' | ||||
|         ) | ||||
| 
 | ||||
|     from tractor.devx.debug import ( | ||||
|         BoxedMaybeException, | ||||
|         pause, | ||||
|     ) | ||||
|     boxed_maybe_exc = BoxedMaybeException( | ||||
|         raise_on_exit=raise_unmasked, | ||||
|     ) | ||||
|     matching: list[BaseException]|None = None | ||||
|     maybe_eg: ExceptionGroup|None | ||||
| 
 | ||||
|     if tn: | ||||
|         try:  # handle egs | ||||
|             yield boxed_maybe_exc | ||||
|             return | ||||
|         except* unmask_from as _maybe_eg: | ||||
|             maybe_eg = _maybe_eg | ||||
|     try: | ||||
|         yield boxed_maybe_exc | ||||
|         return | ||||
|     except BaseException as _bexc: | ||||
|         bexc = _bexc | ||||
|         if isinstance(bexc, BaseExceptionGroup): | ||||
|             matches: ExceptionGroup | ||||
|             matches, _ = maybe_eg.split( | ||||
|                 unmask_from | ||||
|             ) | ||||
|             if not matches: | ||||
|                 raise | ||||
|             matches, _ = bexc.split(unmask_from) | ||||
|             if matches: | ||||
|                 matching = matches.exceptions | ||||
| 
 | ||||
|             matching: list[BaseException] = matches.exceptions | ||||
|     else: | ||||
|         try:  # handle non-egs | ||||
|             yield boxed_maybe_exc | ||||
|             return | ||||
|         except unmask_from as _maybe_exc: | ||||
|             maybe_exc = _maybe_exc | ||||
|             matching: list[BaseException] = [ | ||||
|                 maybe_exc | ||||
|             ] | ||||
| 
 | ||||
|         # XXX, only unmask-ed for debuggin! | ||||
|         # TODO, remove eventually.. | ||||
|         except BaseException as _berr: | ||||
|             berr = _berr | ||||
|             await pause(shield=True) | ||||
|             raise berr | ||||
|         elif ( | ||||
|             unmask_from | ||||
|             and | ||||
|             type(bexc) in unmask_from | ||||
|         ): | ||||
|             matching = [bexc] | ||||
| 
 | ||||
|     if matching is None: | ||||
|         raise | ||||
| 
 | ||||
|     masked: list[tuple[BaseException, BaseException]] = [] | ||||
|     for exc_match in matching: | ||||
| 
 | ||||
|         if exc_ctx := find_masked_excs( | ||||
|             maybe_masker=exc_match, | ||||
|             unmask_from={unmask_from}, | ||||
|             unmask_from=set(unmask_from), | ||||
|         ): | ||||
|             masked.append((exc_ctx, exc_match)) | ||||
|             masked.append(( | ||||
|                 exc_ctx, | ||||
|                 exc_match, | ||||
|             )) | ||||
|             boxed_maybe_exc.value = exc_match | ||||
|             note: str = ( | ||||
|                 f'\n' | ||||
|                 f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n' | ||||
|                 f'^^WARNING^^\n' | ||||
|                 f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n' | ||||
|             ) | ||||
|             if extra_note: | ||||
|                 note += ( | ||||
|                     f'\n' | ||||
|                     f'{extra_note}\n' | ||||
|                 ) | ||||
|             exc_ctx.add_note(note) | ||||
| 
 | ||||
|             if type(exc_match) in always_warn_on: | ||||
|             do_warn: bool = ( | ||||
|                 never_warn_on.get( | ||||
|                     type(exc_ctx)  # masking type | ||||
|                 ) | ||||
|                 is not | ||||
|                 type(exc_match)  # masked type | ||||
|             ) | ||||
| 
 | ||||
|             if do_warn: | ||||
|                 exc_ctx.add_note(note) | ||||
| 
 | ||||
|             if ( | ||||
|                 do_warn | ||||
|                 and | ||||
|                 type(exc_match) in always_warn_on | ||||
|             ): | ||||
|                 log.warning(note) | ||||
| 
 | ||||
|             # await tractor.pause(shield=True) | ||||
|             if raise_unmasked: | ||||
| 
 | ||||
|             if ( | ||||
|                 do_warn | ||||
|                 and | ||||
|                 raise_unmasked | ||||
|             ): | ||||
|                 if len(masked) < 2: | ||||
|                     # don't unmask already known "special" cases.. | ||||
|                     if ( | ||||
|                         _mask_cases | ||||
|                         and | ||||
|                         (cases := _mask_cases.get(type(exc_ctx))) | ||||
|                         and | ||||
|                         (masker_frame := is_expected_masking_case( | ||||
|                             cases, | ||||
|                             exc_ctx, | ||||
|                             exc_match, | ||||
|                         )) | ||||
|                     ): | ||||
|                         log.warning( | ||||
|                             f'Ignoring already-known, non-ideal-but-valid ' | ||||
|                             f'masker code @\n' | ||||
|                             f'{masker_frame}\n' | ||||
|                             f'\n' | ||||
|                             f'NOT raising {exc_ctx} from masker {exc_match!r}\n' | ||||
|                         ) | ||||
|                         raise exc_match | ||||
| 
 | ||||
|                     raise exc_ctx from exc_match | ||||
|                 else: | ||||
|                     # ?TODO, see above but, possibly unmasking sub-exc | ||||
|                     # entries if there are > 1 | ||||
|                     await pause(shield=True) | ||||
| 
 | ||||
|                 # ??TODO, see above but, possibly unmasking sub-exc | ||||
|                 # entries if there are > 1 | ||||
|                 # else: | ||||
|                 #     await pause(shield=True) | ||||
|     else: | ||||
|         raise | ||||
|  |  | |||
|  | @ -0,0 +1,94 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2018-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| `trio.Nursery` wrappers which we short-hand refer to as | ||||
| `tn`: "task nursery". | ||||
| 
 | ||||
| (whereas we refer to `tractor.ActorNursery` as the short-hand `an`) | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from types import ModuleType | ||||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncGenerator, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| # from ._beg import ( | ||||
| #     collapse_eg, | ||||
| # ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from tractor import ActorNursery | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| # ??TODO? is this even a good idea?? | ||||
| # it's an extra LoC to stack `collapse_eg()` vs. | ||||
| # a new/foreign/bad-std-named very thing wrapper..? | ||||
| # -[ ] is there a better/simpler name? | ||||
| # @acm | ||||
| # async def open_loose_tn() -> trio.Nursery: | ||||
| #     ''' | ||||
| #     Implements the equivalent of the old style loose eg raising | ||||
| #     task-nursery from `trio<=0.25.0` , | ||||
| 
 | ||||
| #     .. code-block:: python | ||||
| 
 | ||||
| #         async with trio.open_nursery( | ||||
| #             strict_exception_groups=False, | ||||
| #         ) as tn: | ||||
| #             ... | ||||
| 
 | ||||
| #     ''' | ||||
| #     async with ( | ||||
| #         collapse_eg(), | ||||
| #         trio.open_nursery() as tn, | ||||
| #     ): | ||||
| #         yield tn | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_nursery( | ||||
|     nursery: trio.Nursery|ActorNursery|None = None, | ||||
|     shield: bool = False, | ||||
|     lib: ModuleType = trio, | ||||
|     loose: bool = False, | ||||
| 
 | ||||
|     **kwargs,  # proxy thru | ||||
| 
 | ||||
| ) -> AsyncGenerator[trio.Nursery, Any]: | ||||
|     ''' | ||||
|     Create a new nursery if None provided. | ||||
| 
 | ||||
|     Blocks on exit as expected if no input nursery is provided. | ||||
| 
 | ||||
|     ''' | ||||
|     if nursery is not None: | ||||
|         yield nursery | ||||
|     else: | ||||
|         async with lib.open_nursery(**kwargs) as tn: | ||||
|             tn.cancel_scope.shield = shield | ||||
|             yield tn | ||||
		Loading…
	
		Reference in New Issue