Compare commits
	
		
			20 Commits 
		
	
	
		
			main
			...
			egs_with_c
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | daf4b4ee85 | |
|  | 3f09843951 | |
|  | 3483151aa8 | |
|  | 4a5f041211 | |
|  | 7d0186aab9 | |
|  | f9b548e4e7 | |
|  | afbe90bcfa | |
|  | 44538c44b1 | |
|  | 62fc462580 | |
|  | c5091afa38 | |
|  | f6ac0c2eb7 | |
|  | 8727c1e4c2 | |
|  | 42cae56823 | |
|  | 35550dd2a2 | |
|  | c437196d9b | |
|  | 882c33ff06 | |
|  | cd79fd79b9 | |
|  | 53d5b59b7b | |
|  | e224b8a994 | |
|  | 5db2ebf8d0 | 
							
								
								
									
										3
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										3
									
								
								setup.py
								
								
								
								
							|  | @ -44,9 +44,10 @@ setup( | |||
|         # trio related | ||||
|         # proper range spec: | ||||
|         # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 | ||||
|         'trio >= 0.20, < 0.22', | ||||
|         'trio >= 0.22', | ||||
|         'async_generator', | ||||
|         'trio_typing', | ||||
|         'exceptiongroup', | ||||
| 
 | ||||
|         # tooling | ||||
|         'tricycle', | ||||
|  |  | |||
|  | @ -8,6 +8,10 @@ import platform | |||
| import time | ||||
| from itertools import repeat | ||||
| 
 | ||||
| from exceptiongroup import ( | ||||
|     BaseExceptionGroup, | ||||
|     ExceptionGroup, | ||||
| ) | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
|  | @ -56,29 +60,49 @@ def test_remote_error(arb_addr, args_err): | |||
|             arbiter_addr=arb_addr, | ||||
|         ) as nursery: | ||||
| 
 | ||||
|             # on a remote type error caused by bad input args | ||||
|             # this should raise directly which means we **don't** get | ||||
|             # an exception group outside the nursery since the error | ||||
|             # here and the far end task error are one in the same? | ||||
|             portal = await nursery.run_in_actor( | ||||
|                 assert_err, name='errorer', **args | ||||
|             ) | ||||
| 
 | ||||
|             # get result(s) from main task | ||||
|             try: | ||||
|                 # this means the root actor will also raise a local | ||||
|                 # parent task error and thus an eg will propagate out | ||||
|                 # of this actor nursery. | ||||
|                 await portal.result() | ||||
|             except tractor.RemoteActorError as err: | ||||
|                 assert err.type == errtype | ||||
|                 print("Look Maa that actor failed hard, hehh") | ||||
|                 raise | ||||
| 
 | ||||
|     with pytest.raises(tractor.RemoteActorError) as excinfo: | ||||
|         trio.run(main) | ||||
|     # ensure boxed errors | ||||
|     if args: | ||||
|         with pytest.raises(tractor.RemoteActorError) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|     # ensure boxed error is correct | ||||
|     assert excinfo.value.type == errtype | ||||
|         assert excinfo.value.type == errtype | ||||
| 
 | ||||
|     else: | ||||
|         # the root task will also error on the `.result()` call | ||||
|         # so we expect an error from there AND the child. | ||||
|         with pytest.raises(BaseExceptionGroup) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         # ensure boxed errors | ||||
|         for exc in excinfo.value.exceptions: | ||||
|             assert exc.type == errtype | ||||
| 
 | ||||
| 
 | ||||
| def test_multierror(arb_addr): | ||||
|     """Verify we raise a ``trio.MultiError`` out of a nursery where | ||||
|     ''' | ||||
|     Verify we raise a ``BaseExceptionGroup`` out of a nursery where | ||||
|     more then one actor errors. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             arbiter_addr=arb_addr, | ||||
|  | @ -95,10 +119,10 @@ def test_multierror(arb_addr): | |||
|                 print("Look Maa that first actor failed hard, hehh") | ||||
|                 raise | ||||
| 
 | ||||
|         # here we should get a `trio.MultiError` containing exceptions | ||||
|         # here we should get a ``BaseExceptionGroup`` containing exceptions | ||||
|         # from both subactors | ||||
| 
 | ||||
|     with pytest.raises(trio.MultiError): | ||||
|     with pytest.raises(BaseExceptionGroup): | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -107,7 +131,7 @@ def test_multierror(arb_addr): | |||
|     'num_subactors', range(25, 26), | ||||
| ) | ||||
| def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): | ||||
|     """Verify we raise a ``trio.MultiError`` out of a nursery where | ||||
|     """Verify we raise a ``BaseExceptionGroup`` out of a nursery where | ||||
|     more then one actor errors and also with a delay before failure | ||||
|     to test failure during an ongoing spawning. | ||||
|     """ | ||||
|  | @ -123,10 +147,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): | |||
|                     delay=delay | ||||
|                 ) | ||||
| 
 | ||||
|     with pytest.raises(trio.MultiError) as exc_info: | ||||
|     # with pytest.raises(trio.MultiError) as exc_info: | ||||
|     with pytest.raises(BaseExceptionGroup) as exc_info: | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     assert exc_info.type == tractor.MultiError | ||||
|     assert exc_info.type == ExceptionGroup | ||||
|     err = exc_info.value | ||||
|     exceptions = err.exceptions | ||||
| 
 | ||||
|  | @ -214,8 +239,8 @@ async def test_cancel_infinite_streamer(start_method): | |||
|     [ | ||||
|         # daemon actors sit idle while single task actors error out | ||||
|         (1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None), | ||||
|         (2, tractor.MultiError, AssertionError, (assert_err, {}), None), | ||||
|         (3, tractor.MultiError, AssertionError, (assert_err, {}), None), | ||||
|         (2, BaseExceptionGroup, AssertionError, (assert_err, {}), None), | ||||
|         (3, BaseExceptionGroup, AssertionError, (assert_err, {}), None), | ||||
| 
 | ||||
|         # 1 daemon actor errors out while single task actors sleep forever | ||||
|         (3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}), | ||||
|  | @ -226,7 +251,7 @@ async def test_cancel_infinite_streamer(start_method): | |||
|          (do_nuthin, {}), (assert_err, {'delay': 1}, True)), | ||||
|         # daemon complete quickly delay while single task | ||||
|         # actors error after brief delay | ||||
|         (3, tractor.MultiError, AssertionError, | ||||
|         (3, BaseExceptionGroup, AssertionError, | ||||
|          (assert_err, {'delay': 1}), (do_nuthin, {}, False)), | ||||
|     ], | ||||
|     ids=[ | ||||
|  | @ -293,7 +318,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): | |||
|         # should error here with a ``RemoteActorError`` or ``MultiError`` | ||||
| 
 | ||||
|     except first_err as err: | ||||
|         if isinstance(err, tractor.MultiError): | ||||
|         if isinstance(err, BaseExceptionGroup): | ||||
|             assert len(err.exceptions) == num_actors | ||||
|             for exc in err.exceptions: | ||||
|                 if isinstance(exc, tractor.RemoteActorError): | ||||
|  | @ -337,7 +362,7 @@ async def spawn_and_error(breadth, depth) -> None: | |||
| @tractor_test | ||||
| async def test_nested_multierrors(loglevel, start_method): | ||||
|     ''' | ||||
|     Test that failed actor sets are wrapped in `trio.MultiError`s. This | ||||
|     Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This | ||||
|     test goes only 2 nurseries deep but we should eventually have tests | ||||
|     for arbitrary n-depth actor trees. | ||||
| 
 | ||||
|  | @ -365,7 +390,7 @@ async def test_nested_multierrors(loglevel, start_method): | |||
|                         breadth=subactor_breadth, | ||||
|                         depth=depth, | ||||
|                     ) | ||||
|         except trio.MultiError as err: | ||||
|         except BaseExceptionGroup as err: | ||||
|             assert len(err.exceptions) == subactor_breadth | ||||
|             for subexc in err.exceptions: | ||||
| 
 | ||||
|  | @ -383,10 +408,10 @@ async def test_nested_multierrors(loglevel, start_method): | |||
|                         assert subexc.type in ( | ||||
|                             tractor.RemoteActorError, | ||||
|                             trio.Cancelled, | ||||
|                             trio.MultiError | ||||
|                             BaseExceptionGroup, | ||||
|                         ) | ||||
| 
 | ||||
|                     elif isinstance(subexc, trio.MultiError): | ||||
|                     elif isinstance(subexc, BaseExceptionGroup): | ||||
|                         for subsub in subexc.exceptions: | ||||
| 
 | ||||
|                             if subsub in (tractor.RemoteActorError,): | ||||
|  | @ -394,7 +419,7 @@ async def test_nested_multierrors(loglevel, start_method): | |||
| 
 | ||||
|                             assert type(subsub) in ( | ||||
|                                 trio.Cancelled, | ||||
|                                 trio.MultiError, | ||||
|                                 BaseExceptionGroup, | ||||
|                             ) | ||||
|                 else: | ||||
|                     assert isinstance(subexc, tractor.RemoteActorError) | ||||
|  | @ -406,13 +431,13 @@ async def test_nested_multierrors(loglevel, start_method): | |||
|                     if is_win(): | ||||
|                         if isinstance(subexc, tractor.RemoteActorError): | ||||
|                             assert subexc.type in ( | ||||
|                                 trio.MultiError, | ||||
|                                 BaseExceptionGroup, | ||||
|                                 tractor.RemoteActorError | ||||
|                             ) | ||||
|                         else: | ||||
|                             assert isinstance(subexc, trio.MultiError) | ||||
|                             assert isinstance(subexc, BaseExceptionGroup) | ||||
|                     else: | ||||
|                         assert subexc.type is trio.MultiError | ||||
|                         assert subexc.type is ExceptionGroup | ||||
|                 else: | ||||
|                     assert subexc.type in ( | ||||
|                         tractor.RemoteActorError, | ||||
|  |  | |||
|  | @ -485,10 +485,12 @@ def test_multi_subactors( | |||
|     # 2nd name_error failure | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     assert_before(child, [ | ||||
|         "Attaching to pdb in crashed actor: ('name_error_1'", | ||||
|         "NameError", | ||||
|     ]) | ||||
|     # TODO: will we ever get the race where this crash will show up? | ||||
|     # blocklist strat now prevents this crash | ||||
|     # assert_before(child, [ | ||||
|     #     "Attaching to pdb in crashed actor: ('name_error_1'", | ||||
|     #     "NameError", | ||||
|     # ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
|  | @ -683,49 +685,64 @@ def test_multi_subactors_root_errors( | |||
|     # continue again to catch 2nd name error from | ||||
|     # actor 'name_error_1' (which is 2nd depth). | ||||
|     child.sendline('c') | ||||
| 
 | ||||
|     # due to block list strat from #337, this will no longer | ||||
|     # propagate before the root errors and cancels the spawner sub-tree. | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # only if the blocking condition doesn't kick in fast enough | ||||
|     before = str(child.before.decode()) | ||||
|     if "Debug lock blocked for ['name_error_1'" not in before: | ||||
| 
 | ||||
|         assert_before(child, [ | ||||
|             "Attaching to pdb in crashed actor: ('name_error_1'", | ||||
|             "NameError", | ||||
|         ]) | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|         child.sendline('c') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # check if the spawner crashed or was blocked from debug | ||||
|     # and if this intermediary attached check the boxed error | ||||
|     before = str(child.before.decode()) | ||||
|     if "Attaching to pdb in crashed actor: ('spawn_error'" in before: | ||||
| 
 | ||||
|         assert_before(child, [ | ||||
|             # boxed error from spawner's child | ||||
|             "RemoteActorError: ('name_error_1'", | ||||
|             "NameError", | ||||
|         ]) | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|         child.sendline('c') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # expect a root actor crash | ||||
|     assert_before(child, [ | ||||
|         "Attaching to pdb in crashed actor: ('name_error_1'", | ||||
|         "NameError", | ||||
|     ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     assert_before(child, [ | ||||
|         "Attaching to pdb in crashed actor: ('spawn_error'", | ||||
|         # boxed error from previous step | ||||
|         "RemoteActorError: ('name_error_1'", | ||||
|         "NameError", | ||||
|     ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     assert_before(child, [ | ||||
|         "Attaching to pdb in crashed actor: ('root'", | ||||
|         # boxed error from previous step | ||||
|         "RemoteActorError: ('name_error'", | ||||
|         "NameError", | ||||
| 
 | ||||
|         # error from root actor and root task that created top level nursery | ||||
|         "Attaching to pdb in crashed actor: ('root'", | ||||
|         "AssertionError", | ||||
|     ]) | ||||
| 
 | ||||
|     # warnings assert we probably don't need | ||||
|     # assert "Cancelling nursery in ('spawn_error'," in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # continue again | ||||
|     child.sendline('c') | ||||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     # error from root actor and root task that created top level nursery | ||||
|     assert "AssertionError" in before | ||||
|     assert_before(child, [ | ||||
|         # "Attaching to pdb in crashed actor: ('root'", | ||||
|         # boxed error from previous step | ||||
|         "RemoteActorError: ('name_error'", | ||||
|         "NameError", | ||||
|         "AssertionError", | ||||
|         'assert 0', | ||||
|     ]) | ||||
| 
 | ||||
| 
 | ||||
| @has_nested_actors | ||||
|  |  | |||
|  | @ -8,6 +8,7 @@ import builtins | |||
| import itertools | ||||
| import importlib | ||||
| 
 | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
|  | @ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr): | |||
|             # should trigger remote actor error | ||||
|             await portal.result() | ||||
| 
 | ||||
|     with pytest.raises(RemoteActorError) as excinfo: | ||||
|     with pytest.raises(BaseExceptionGroup) as excinfo: | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     # ensure boxed error is correct | ||||
|     assert excinfo.value.type == Exception | ||||
|     # ensure boxed errors | ||||
|     for exc in excinfo.value.exceptions: | ||||
|         assert exc.type == Exception | ||||
| 
 | ||||
| 
 | ||||
| def test_trio_closes_early_and_channel_exits(arb_addr): | ||||
|  | @ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr): | |||
|             # should trigger remote actor error | ||||
|             await portal.result() | ||||
| 
 | ||||
|     with pytest.raises(RemoteActorError) as excinfo: | ||||
|     with pytest.raises(BaseExceptionGroup) as excinfo: | ||||
|         trio.run(main) | ||||
| 
 | ||||
|     # ensure boxed error is correct | ||||
|     assert excinfo.value.type == Exception | ||||
|     # ensure boxed errors | ||||
|     for exc in excinfo.value.exceptions: | ||||
|         assert exc.type == Exception | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  |  | |||
|  | @ -18,7 +18,7 @@ | |||
| tractor: structured concurrent "actors". | ||||
| 
 | ||||
| """ | ||||
| from trio import MultiError | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| 
 | ||||
| from ._clustering import open_actor_cluster | ||||
| from ._ipc import Channel | ||||
|  | @ -62,7 +62,7 @@ __all__ = [ | |||
|     'ContextCancelled', | ||||
|     'ModuleNotExposed', | ||||
|     'MsgStream', | ||||
|     'MultiError', | ||||
|     'BaseExceptionGroup', | ||||
|     'Portal', | ||||
|     'ReceiveMsgStream', | ||||
|     'RemoteActorError', | ||||
|  |  | |||
|  | @ -25,6 +25,7 @@ import signal | |||
| from functools import partial | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Optional, | ||||
|     Callable, | ||||
|     AsyncIterator, | ||||
|  | @ -75,8 +76,12 @@ class Lock: | |||
|     # placeholder for function to set a ``trio.Event`` on debugger exit | ||||
|     # pdb_release_hook: Optional[Callable] = None | ||||
| 
 | ||||
|     _trio_handler: Callable[ | ||||
|         [int, Optional[FrameType]], Any | ||||
|     ] | int | None = None | ||||
| 
 | ||||
|     # actor-wide variable pointing to current task name using debugger | ||||
|     local_task_in_debug: Optional[str] = None | ||||
|     local_task_in_debug: str | None = None | ||||
| 
 | ||||
|     # NOTE: set by the current task waiting on the root tty lock from | ||||
|     # the CALLER side of the `lock_tty_for_child()` context entry-call | ||||
|  | @ -105,19 +110,16 @@ class Lock: | |||
|     @classmethod | ||||
|     def shield_sigint(cls): | ||||
|         cls._orig_sigint_handler = signal.signal( | ||||
|                 signal.SIGINT, | ||||
|                 shield_sigint, | ||||
|             ) | ||||
|             signal.SIGINT, | ||||
|             shield_sigint, | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def unshield_sigint(cls): | ||||
|         if cls._orig_sigint_handler is not None: | ||||
|             # restore original sigint handler | ||||
|             signal.signal( | ||||
|                 signal.SIGINT, | ||||
|                 cls._orig_sigint_handler | ||||
|             ) | ||||
| 
 | ||||
|         # always restore ``trio``'s sigint handler. see notes below in | ||||
|         # the pdb factory about the nightmare that is that code swapping | ||||
|         # out the handler when the repl activates... | ||||
|         signal.signal(signal.SIGINT, cls._trio_handler) | ||||
|         cls._orig_sigint_handler = None | ||||
| 
 | ||||
|     @classmethod | ||||
|  | @ -544,7 +546,7 @@ def shield_sigint( | |||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Specialized debugger compatible SIGINT handler. | ||||
|     Specialized, debugger-aware SIGINT handler. | ||||
| 
 | ||||
|     In childred we always ignore to avoid deadlocks since cancellation | ||||
|     should always be managed by the parent supervising actor. The root | ||||
|  | @ -601,6 +603,8 @@ def shield_sigint( | |||
|         # which has already terminated to unlock. | ||||
|         and any_connected | ||||
|     ): | ||||
|         # we are root and some actor is in debug mode | ||||
|         # if uid_in_debug is not None: | ||||
|         name = uid_in_debug[0] | ||||
|         if name != 'root': | ||||
|             log.pdb( | ||||
|  | @ -611,6 +615,22 @@ def shield_sigint( | |||
|             log.pdb( | ||||
|                 "Ignoring SIGINT while in debug mode" | ||||
|             ) | ||||
|     elif ( | ||||
|         is_root_process() | ||||
|     ): | ||||
|         log.pdb( | ||||
|             "Ignoring SIGINT since debug mode is enabled" | ||||
|         ) | ||||
| 
 | ||||
|         # revert back to ``trio`` handler asap! | ||||
|         Lock.unshield_sigint() | ||||
|         if ( | ||||
|             Lock._root_local_task_cs_in_debug | ||||
|             and not Lock._root_local_task_cs_in_debug.cancel_called | ||||
|         ): | ||||
|             Lock._root_local_task_cs_in_debug.cancel() | ||||
| 
 | ||||
|         # raise KeyboardInterrupt | ||||
| 
 | ||||
|     # child actor that has locked the debugger | ||||
|     elif not is_root_process(): | ||||
|  | @ -636,10 +656,9 @@ def shield_sigint( | |||
|         # https://github.com/goodboy/tractor/issues/320 | ||||
|         # elif debug_mode(): | ||||
| 
 | ||||
|     else: | ||||
|         log.pdb( | ||||
|             "Ignoring SIGINT since debug mode is enabled" | ||||
|         ) | ||||
|     else:  # XXX: shouldn't ever get here? | ||||
|         print("WTFWTFWTF") | ||||
|         raise KeyboardInterrupt | ||||
| 
 | ||||
|     # NOTE: currently (at least on ``fancycompleter`` 0.9.2) | ||||
|     # it lookks to be that the last command that was run (eg. ll) | ||||
|  |  | |||
|  | @ -27,6 +27,7 @@ import importlib | |||
| import builtins | ||||
| import traceback | ||||
| 
 | ||||
| import exceptiongroup as eg | ||||
| import trio | ||||
| 
 | ||||
| 
 | ||||
|  | @ -52,9 +53,6 @@ class RemoteActorError(Exception): | |||
|         self.type = suberror_type | ||||
|         self.msgdata = msgdata | ||||
| 
 | ||||
|     # TODO: a trio.MultiError.catch like context manager | ||||
|     # for catching underlying remote errors of a particular type | ||||
| 
 | ||||
| 
 | ||||
| class InternalActorError(RemoteActorError): | ||||
|     """Remote internal ``tractor`` error indicating | ||||
|  | @ -123,10 +121,12 @@ def unpack_error( | |||
|     err_type=RemoteActorError | ||||
| 
 | ||||
| ) -> Exception: | ||||
|     """Unpack an 'error' message from the wire | ||||
|     ''' | ||||
|     Unpack an 'error' message from the wire | ||||
|     into a local ``RemoteActorError``. | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
|     __tracebackhide__ = True | ||||
|     error = msg['error'] | ||||
| 
 | ||||
|     tb_str = error.get('tb_str', '') | ||||
|  | @ -139,7 +139,12 @@ def unpack_error( | |||
|         suberror_type = trio.Cancelled | ||||
| 
 | ||||
|     else:  # try to lookup a suitable local error type | ||||
|         for ns in [builtins, _this_mod, trio]: | ||||
|         for ns in [ | ||||
|             builtins, | ||||
|             _this_mod, | ||||
|             eg, | ||||
|             trio, | ||||
|         ]: | ||||
|             try: | ||||
|                 suberror_type = getattr(ns, type_name) | ||||
|                 break | ||||
|  | @ -158,12 +163,15 @@ def unpack_error( | |||
| 
 | ||||
| 
 | ||||
| def is_multi_cancelled(exc: BaseException) -> bool: | ||||
|     """Predicate to determine if a ``trio.MultiError`` contains only | ||||
|     ``trio.Cancelled`` sub-exceptions (and is likely the result of | ||||
|     ''' | ||||
|     Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains | ||||
|     only ``trio.Cancelled`` sub-exceptions (and is likely the result of | ||||
|     cancelling a collection of subtasks. | ||||
| 
 | ||||
|     """ | ||||
|     return not trio.MultiError.filter( | ||||
|         lambda exc: exc if not isinstance(exc, trio.Cancelled) else None, | ||||
|         exc, | ||||
|     ) | ||||
|     ''' | ||||
|     if isinstance(exc, eg.BaseExceptionGroup): | ||||
|         return exc.subgroup( | ||||
|             lambda exc: isinstance(exc, trio.Cancelled) | ||||
|         ) is not None | ||||
| 
 | ||||
|     return False | ||||
|  |  | |||
|  | @ -52,17 +52,17 @@ log = get_logger(__name__) | |||
| 
 | ||||
| 
 | ||||
| def _unwrap_msg( | ||||
| 
 | ||||
|     msg: dict[str, Any], | ||||
|     channel: Channel | ||||
| 
 | ||||
| ) -> Any: | ||||
|     __tracebackhide__ = True | ||||
|     try: | ||||
|         return msg['return'] | ||||
|     except KeyError: | ||||
|         # internal error should never get here | ||||
|         assert msg.get('cid'), "Received internal error at portal?" | ||||
|         raise unpack_error(msg, channel) | ||||
|         raise unpack_error(msg, channel) from None | ||||
| 
 | ||||
| 
 | ||||
| class MessagingError(Exception): | ||||
|  | @ -136,6 +136,7 @@ class Portal: | |||
|         Return the result(s) from the remote actor's "main" task. | ||||
| 
 | ||||
|         ''' | ||||
|         # __tracebackhide__ = True | ||||
|         # Check for non-rpc errors slapped on the | ||||
|         # channel for which we always raise | ||||
|         exc = self.channel._exc | ||||
|  | @ -460,7 +461,6 @@ class Portal: | |||
|             # sure it's worth being pedantic: | ||||
|             # Exception, | ||||
|             # trio.Cancelled, | ||||
|             # trio.MultiError, | ||||
|             # KeyboardInterrupt, | ||||
| 
 | ||||
|         ) as err: | ||||
|  |  | |||
|  | @ -23,15 +23,22 @@ from functools import partial | |||
| import importlib | ||||
| import logging | ||||
| import os | ||||
| import signal | ||||
| from typing import ( | ||||
|     Optional, | ||||
| ) | ||||
| import typing | ||||
| import warnings | ||||
| 
 | ||||
| 
 | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| import trio | ||||
| 
 | ||||
| from ._runtime import Actor, Arbiter, async_main | ||||
| from ._runtime import ( | ||||
|     Actor, | ||||
|     Arbiter, | ||||
|     async_main, | ||||
| ) | ||||
| from . import _debug | ||||
| from . import _spawn | ||||
| from . import _state | ||||
|  | @ -74,14 +81,19 @@ async def open_root_actor( | |||
|     rpc_module_paths: Optional[list] = None, | ||||
| 
 | ||||
| ) -> typing.Any: | ||||
|     """Async entry point for ``tractor``. | ||||
|     ''' | ||||
|     Runtime init entry point for ``tractor``. | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
|     # Override the global debugger hook to make it play nice with | ||||
|     # ``trio``, see: | ||||
|     # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 | ||||
|     os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' | ||||
| 
 | ||||
|     # attempt to retreive ``trio``'s sigint handler and stash it | ||||
|     # on our debugger lock state. | ||||
|     _debug.Lock._trio_handler =  signal.getsignal(signal.SIGINT) | ||||
| 
 | ||||
|     # mark top most level process as root actor | ||||
|     _state._runtime_vars['_is_root'] = True | ||||
| 
 | ||||
|  | @ -205,7 +217,10 @@ async def open_root_actor( | |||
|             try: | ||||
|                 yield actor | ||||
| 
 | ||||
|             except (Exception, trio.MultiError) as err: | ||||
|             except ( | ||||
|                 Exception, | ||||
|                 BaseExceptionGroup, | ||||
|             ) as err: | ||||
| 
 | ||||
|                 entered = await _debug._maybe_enter_pm(err) | ||||
| 
 | ||||
|  |  | |||
|  | @ -25,21 +25,23 @@ from itertools import chain | |||
| import importlib | ||||
| import importlib.util | ||||
| import inspect | ||||
| import uuid | ||||
| import signal | ||||
| import sys | ||||
| from typing import ( | ||||
|     Any, Optional, | ||||
|     Union, TYPE_CHECKING, | ||||
|     Callable, | ||||
| ) | ||||
| import uuid | ||||
| from types import ModuleType | ||||
| import sys | ||||
| import os | ||||
| from contextlib import ExitStack | ||||
| import warnings | ||||
| 
 | ||||
| from async_generator import aclosing | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| import trio  # type: ignore | ||||
| from trio_typing import TaskStatus | ||||
| from async_generator import aclosing | ||||
| 
 | ||||
| from ._ipc import Channel | ||||
| from ._streaming import Context | ||||
|  | @ -194,7 +196,7 @@ async def _invoke( | |||
|                     res = await coro | ||||
|                     await chan.send({'return': res, 'cid': cid}) | ||||
| 
 | ||||
|             except trio.MultiError: | ||||
|             except BaseExceptionGroup: | ||||
|                 # if a context error was set then likely | ||||
|                 # thei multierror was raised due to that | ||||
|                 if ctx._error is not None: | ||||
|  | @ -266,7 +268,7 @@ async def _invoke( | |||
| 
 | ||||
|     except ( | ||||
|         Exception, | ||||
|         trio.MultiError | ||||
|         BaseExceptionGroup, | ||||
|     ) as err: | ||||
| 
 | ||||
|         if not is_multi_cancelled(err): | ||||
|  | @ -349,7 +351,7 @@ def _get_mod_abspath(module): | |||
| 
 | ||||
| async def try_ship_error_to_parent( | ||||
|     channel: Channel, | ||||
|     err: Union[Exception, trio.MultiError], | ||||
|     err: Union[Exception, BaseExceptionGroup], | ||||
| 
 | ||||
| ) -> None: | ||||
|     with trio.CancelScope(shield=True): | ||||
|  | @ -708,6 +710,14 @@ class Actor: | |||
|                 log.runtime(f"No more channels for {chan.uid}") | ||||
|                 self._peers.pop(uid, None) | ||||
| 
 | ||||
|             log.runtime(f"Peers is {self._peers}") | ||||
| 
 | ||||
|             # No more channels to other actors (at all) registered | ||||
|             # as connected. | ||||
|             if not self._peers: | ||||
|                 log.runtime("Signalling no more peer channel connections") | ||||
|                 self._no_more_peers.set() | ||||
| 
 | ||||
|                 # NOTE: block this actor from acquiring the | ||||
|                 # debugger-TTY-lock since we have no way to know if we | ||||
|                 # cancelled it and further there is no way to ensure the | ||||
|  | @ -721,23 +731,16 @@ class Actor: | |||
|                     # if a now stale local task has the TTY lock still | ||||
|                     # we cancel it to allow servicing other requests for | ||||
|                     # the lock. | ||||
|                     db_cs = pdb_lock._root_local_task_cs_in_debug | ||||
|                     if ( | ||||
|                         pdb_lock._root_local_task_cs_in_debug | ||||
|                         and not pdb_lock._root_local_task_cs_in_debug.cancel_called | ||||
|                         db_cs | ||||
|                         and not db_cs.cancel_called | ||||
|                     ): | ||||
|                         log.warning( | ||||
|                             f'STALE DEBUG LOCK DETECTED FOR {uid}' | ||||
|                         ) | ||||
|                         # TODO: figure out why this breaks tests.. | ||||
|                         # pdb_lock._root_local_task_cs_in_debug.cancel() | ||||
| 
 | ||||
|             log.runtime(f"Peers is {self._peers}") | ||||
| 
 | ||||
|             # No more channels to other actors (at all) registered | ||||
|             # as connected. | ||||
|             if not self._peers: | ||||
|                 log.runtime("Signalling no more peer channel connections") | ||||
|                 self._no_more_peers.set() | ||||
|                         db_cs.cancel() | ||||
| 
 | ||||
|             # XXX: is this necessary (GC should do it)? | ||||
|             if chan.connected(): | ||||
|  | @ -1228,6 +1231,10 @@ async def async_main( | |||
|     and when cancelled effectively cancels the actor. | ||||
| 
 | ||||
|     ''' | ||||
|     # attempt to retreive ``trio``'s sigint handler and stash it | ||||
|     # on our debugger lock state. | ||||
|     _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) | ||||
| 
 | ||||
|     registered_with_arbiter = False | ||||
|     try: | ||||
| 
 | ||||
|  | @ -1549,7 +1556,10 @@ async def process_messages( | |||
|                         partial(_invoke, actor, cid, chan, func, kwargs), | ||||
|                         name=funcname, | ||||
|                     ) | ||||
|                 except (RuntimeError, trio.MultiError): | ||||
|                 except ( | ||||
|                     RuntimeError, | ||||
|                     BaseExceptionGroup, | ||||
|                 ): | ||||
|                     # avoid reporting a benign race condition | ||||
|                     # during actor runtime teardown. | ||||
|                     nursery_cancelled_before_task = True | ||||
|  | @ -1594,7 +1604,10 @@ async def process_messages( | |||
|         # transport **was** disconnected | ||||
|         return True | ||||
| 
 | ||||
|     except (Exception, trio.MultiError) as err: | ||||
|     except ( | ||||
|         Exception, | ||||
|         BaseExceptionGroup, | ||||
|     ) as err: | ||||
|         if nursery_cancelled_before_task: | ||||
|             sn = actor._service_n | ||||
|             assert sn and sn.cancel_scope.cancel_called | ||||
|  |  | |||
|  | @ -31,6 +31,7 @@ from typing import ( | |||
| ) | ||||
| from collections.abc import Awaitable | ||||
| 
 | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| 
 | ||||
|  | @ -139,6 +140,7 @@ async def exhaust_portal( | |||
|     If the main task is an async generator do our best to consume | ||||
|     what's left of it. | ||||
|     ''' | ||||
|     __tracebackhide__ = True | ||||
|     try: | ||||
|         log.debug(f"Waiting on final result from {actor.uid}") | ||||
| 
 | ||||
|  | @ -146,8 +148,11 @@ async def exhaust_portal( | |||
|         # always be established and shutdown using a context manager api | ||||
|         final = await portal.result() | ||||
| 
 | ||||
|     except (Exception, trio.MultiError) as err: | ||||
|         # we reraise in the parent task via a ``trio.MultiError`` | ||||
|     except ( | ||||
|         Exception, | ||||
|         BaseExceptionGroup, | ||||
|     ) as err: | ||||
|         # we reraise in the parent task via a ``BaseExceptionGroup`` | ||||
|         return err | ||||
|     except trio.Cancelled as err: | ||||
|         # lol, of course we need this too ;P | ||||
|  | @ -175,7 +180,7 @@ async def cancel_on_completion( | |||
|     ''' | ||||
|     # if this call errors we store the exception for later | ||||
|     # in ``errors`` which will be reraised inside | ||||
|     # a MultiError and we still send out a cancel request | ||||
|     # an exception group and we still send out a cancel request | ||||
|     result = await exhaust_portal(portal, actor) | ||||
|     if isinstance(result, Exception): | ||||
|         errors[actor.uid] = result | ||||
|  |  | |||
|  | @ -22,7 +22,6 @@ from typing import ( | |||
|     Optional, | ||||
|     Any, | ||||
| ) | ||||
| from collections.abc import Mapping | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
|  | @ -46,12 +45,6 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor':  # type: ignore # | |||
|     return _current_actor | ||||
| 
 | ||||
| 
 | ||||
| _conc_name_getters = { | ||||
|     'task': trio.lowlevel.current_task, | ||||
|     'actor': current_actor | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def is_main_process() -> bool: | ||||
|     """Bool determining if this actor is running in the top-most process. | ||||
|     """ | ||||
|  |  | |||
|  | @ -27,7 +27,8 @@ from typing import ( | |||
|     Optional, | ||||
|     Callable, | ||||
|     AsyncGenerator, | ||||
|     AsyncIterator | ||||
|     AsyncIterator, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| import warnings | ||||
|  | @ -41,6 +42,10 @@ from .log import get_logger | |||
| from .trionics import broadcast_receiver, BroadcastReceiver | ||||
| 
 | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._portal import Portal | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -269,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|     @asynccontextmanager | ||||
|     async def subscribe( | ||||
|         self, | ||||
| 
 | ||||
|     ) -> AsyncIterator[BroadcastReceiver]: | ||||
|         '''Allocate and return a ``BroadcastReceiver`` which delegates | ||||
|         ''' | ||||
|         Allocate and return a ``BroadcastReceiver`` which delegates | ||||
|         to this message stream. | ||||
| 
 | ||||
|         This allows multiple local tasks to receive each their own copy | ||||
|  | @ -365,7 +370,8 @@ class Context: | |||
|     _remote_func_type: Optional[str] = None | ||||
| 
 | ||||
|     # only set on the caller side | ||||
|     _portal: Optional['Portal'] = None    # type: ignore # noqa | ||||
|     _portal: Optional[Portal] = None    # type: ignore # noqa | ||||
|     _stream: Optional[MsgStream] = None | ||||
|     _result: Optional[Any] = False | ||||
|     _error: Optional[BaseException] = None | ||||
| 
 | ||||
|  | @ -425,19 +431,24 @@ class Context: | |||
|             # (currently) that other portal APIs (``Portal.run()``, | ||||
|             # ``.run_in_actor()``) do their own error checking at the point | ||||
|             # of the call and result processing. | ||||
|             log.error( | ||||
|                 f'Remote context error for {self.chan.uid}:{self.cid}:\n' | ||||
|                 f'{msg["error"]["tb_str"]}' | ||||
|             ) | ||||
|             error = unpack_error(msg, self.chan) | ||||
|             if ( | ||||
|                 isinstance(error, ContextCancelled) and | ||||
|                 self._cancel_called | ||||
|                 isinstance(error, ContextCancelled) | ||||
|             ): | ||||
|                 # this is an expected cancel request response message | ||||
|                 # and we don't need to raise it in scope since it will | ||||
|                 # potentially override a real error | ||||
|                 return | ||||
|                 log.cancel( | ||||
|                     f'Remote context error for {self.chan.uid}:{self.cid}:\n' | ||||
|                     f'{msg["error"]["tb_str"]}' | ||||
|                 ) | ||||
|                 if self._cancel_called: | ||||
|                     # this is an expected cancel request response message | ||||
|                     # and we don't need to raise it in scope since it will | ||||
|                     # potentially override a real error | ||||
|                     return | ||||
|             else: | ||||
|                 log.error( | ||||
|                     f'Remote context error for {self.chan.uid}:{self.cid}:\n' | ||||
|                     f'{msg["error"]["tb_str"]}' | ||||
|                 ) | ||||
| 
 | ||||
|             self._error = error | ||||
| 
 | ||||
|  | @ -473,6 +484,7 @@ class Context: | |||
|         log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') | ||||
| 
 | ||||
|         self._cancel_called = True | ||||
|         ipc_broken: bool = False | ||||
| 
 | ||||
|         if side == 'caller': | ||||
|             if not self._portal: | ||||
|  | @ -490,7 +502,14 @@ class Context: | |||
|                 # NOTE: we're telling the far end actor to cancel a task | ||||
|                 # corresponding to *this actor*. The far end local channel | ||||
|                 # instance is passed to `Actor._cancel_task()` implicitly. | ||||
|                 await self._portal.run_from_ns('self', '_cancel_task', cid=cid) | ||||
|                 try: | ||||
|                     await self._portal.run_from_ns( | ||||
|                         'self', | ||||
|                         '_cancel_task', | ||||
|                         cid=cid, | ||||
|                     ) | ||||
|                 except trio.BrokenResourceError: | ||||
|                     ipc_broken = True | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
|                 # XXX: there's no way to know if the remote task was indeed | ||||
|  | @ -506,7 +525,10 @@ class Context: | |||
|                         "Timed out on cancelling remote task " | ||||
|                         f"{cid} for {self._portal.channel.uid}") | ||||
| 
 | ||||
|         # callee side remote task | ||||
|             elif ipc_broken: | ||||
|                 log.cancel( | ||||
|                     "Transport layer was broken before cancel request " | ||||
|                     f"{cid} for {self._portal.channel.uid}") | ||||
|         else: | ||||
|             self._cancel_msg = msg | ||||
| 
 | ||||
|  | @ -593,10 +615,11 @@ class Context: | |||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=ctx._recv_chan, | ||||
|         ) as rchan: | ||||
|         ) as stream: | ||||
|             self._stream = stream | ||||
| 
 | ||||
|             if self._portal: | ||||
|                 self._portal._streams.add(rchan) | ||||
|                 self._portal._streams.add(stream) | ||||
| 
 | ||||
|             try: | ||||
|                 self._stream_opened = True | ||||
|  | @ -604,7 +627,7 @@ class Context: | |||
|                 # ensure we aren't cancelled before delivering | ||||
|                 # the stream | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield rchan | ||||
|                 yield stream | ||||
| 
 | ||||
|                 # XXX: Make the stream "one-shot use".  On exit, signal | ||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the | ||||
|  | @ -635,25 +658,22 @@ class Context: | |||
| 
 | ||||
|             if not self._recv_chan._closed:  # type: ignore | ||||
| 
 | ||||
|                 # wait for a final context result consuming | ||||
|                 # and discarding any bi dir stream msgs still | ||||
|                 # in transit from the far end. | ||||
|                 while True: | ||||
|                 def consume( | ||||
|                     msg: dict, | ||||
| 
 | ||||
|                     msg = await self._recv_chan.receive() | ||||
|                 ) -> Optional[dict]: | ||||
|                     try: | ||||
|                         self._result = msg['return'] | ||||
|                         break | ||||
|                         return msg['return'] | ||||
|                     except KeyError as msgerr: | ||||
| 
 | ||||
|                         if 'yield' in msg: | ||||
|                             # far end task is still streaming to us so discard | ||||
|                             log.warning(f'Discarding stream delivered {msg}') | ||||
|                             continue | ||||
|                             return | ||||
| 
 | ||||
|                         elif 'stop' in msg: | ||||
|                             log.debug('Remote stream terminated') | ||||
|                             continue | ||||
|                             return | ||||
| 
 | ||||
|                         # internal error should never get here | ||||
|                         assert msg.get('cid'), ( | ||||
|  | @ -663,6 +683,25 @@ class Context: | |||
|                             msg, self._portal.channel | ||||
|                         ) from msgerr | ||||
| 
 | ||||
|                 # wait for a final context result consuming | ||||
|                 # and discarding any bi dir stream msgs still | ||||
|                 # in transit from the far end. | ||||
|                 if self._stream: | ||||
|                     async with self._stream.subscribe() as bstream: | ||||
|                         async for msg in bstream: | ||||
|                             result = consume(msg) | ||||
|                             if result: | ||||
|                                 self._result = result | ||||
|                                 break | ||||
| 
 | ||||
|                 if not self._result: | ||||
|                     while True: | ||||
|                         msg = await self._recv_chan.receive() | ||||
|                         result = consume(msg) | ||||
|                         if result: | ||||
|                             self._result = result | ||||
|                             break | ||||
| 
 | ||||
|         return self._result | ||||
| 
 | ||||
|     async def started( | ||||
|  |  | |||
|  | @ -18,6 +18,7 @@ | |||
| ``trio`` inspired apis and helpers | ||||
| 
 | ||||
| """ | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from functools import partial | ||||
| import inspect | ||||
| from typing import ( | ||||
|  | @ -27,8 +28,8 @@ from typing import ( | |||
| import typing | ||||
| import warnings | ||||
| 
 | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| import trio | ||||
| from async_generator import asynccontextmanager | ||||
| 
 | ||||
| from ._debug import maybe_wait_for_debugger | ||||
| from ._state import current_actor, is_main_process | ||||
|  | @ -82,7 +83,7 @@ class ActorNursery: | |||
|         actor: Actor, | ||||
|         ria_nursery: trio.Nursery, | ||||
|         da_nursery: trio.Nursery, | ||||
|         errors: dict[tuple[str, str], Exception], | ||||
|         errors: dict[tuple[str, str], BaseException], | ||||
|     ) -> None: | ||||
|         # self.supervisor = supervisor  # TODO | ||||
|         self._actor: Actor = actor | ||||
|  | @ -294,13 +295,13 @@ class ActorNursery: | |||
|         self._join_procs.set() | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| @acm | ||||
| async def _open_and_supervise_one_cancels_all_nursery( | ||||
|     actor: Actor, | ||||
| ) -> typing.AsyncGenerator[ActorNursery, None]: | ||||
| 
 | ||||
|     # the collection of errors retreived from spawned sub-actors | ||||
|     errors: dict[tuple[str, str], Exception] = {} | ||||
|     errors: dict[tuple[str, str], BaseException] = {} | ||||
| 
 | ||||
|     # This is the outermost level "deamon actor" nursery. It is awaited | ||||
|     # **after** the below inner "run in actor nursery". This allows for | ||||
|  | @ -346,7 +347,6 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|                     anursery._join_procs.set() | ||||
| 
 | ||||
|                 except BaseException as err: | ||||
| 
 | ||||
|                     # If we error in the root but the debugger is | ||||
|                     # engaged we don't want to prematurely kill (and | ||||
|                     # thus clobber access to) the local tty since it | ||||
|  | @ -382,18 +382,21 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|                             else: | ||||
|                                 log.exception( | ||||
|                                     f"Nursery for {current_actor().uid} " | ||||
|                                     f"errored with {err}, ") | ||||
|                                     f"errored with") | ||||
| 
 | ||||
|                             # cancel all subactors | ||||
|                             await anursery.cancel() | ||||
| 
 | ||||
|                     except trio.MultiError as merr: | ||||
|                     except BaseExceptionGroup as merr: | ||||
|                         # If we receive additional errors while waiting on | ||||
|                         # remaining subactors that were cancelled, | ||||
|                         # aggregate those errors with the original error | ||||
|                         # that triggered this teardown. | ||||
|                         if err not in merr.exceptions: | ||||
|                             raise trio.MultiError(merr.exceptions + [err]) | ||||
|                             raise BaseExceptionGroup( | ||||
|                                 'tractor.ActorNursery errored with', | ||||
|                                 list(merr.exceptions) + [err], | ||||
|                             ) | ||||
|                     else: | ||||
|                         raise | ||||
| 
 | ||||
|  | @ -402,12 +405,12 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|         # XXX: do we need a `trio.Cancelled` catch here as well? | ||||
|         # this is the catch around the ``.run_in_actor()`` nursery | ||||
|         except ( | ||||
| 
 | ||||
|             Exception, | ||||
|             trio.MultiError, | ||||
|             BaseExceptionGroup, | ||||
|             trio.Cancelled | ||||
| 
 | ||||
|         ) as err: | ||||
|         ) as err:  # noqa | ||||
|             errors[actor.uid] = err | ||||
| 
 | ||||
|             # XXX: yet another guard before allowing the cancel | ||||
|             # sequence in case a (single) child is in debug. | ||||
|  | @ -436,9 +439,12 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|                     with trio.CancelScope(shield=True): | ||||
|                         await anursery.cancel() | ||||
| 
 | ||||
|                 # use `MultiError` as needed | ||||
|                 # use `BaseExceptionGroup` as needed | ||||
|                 if len(errors) > 1: | ||||
|                     raise trio.MultiError(tuple(errors.values())) | ||||
|                     raise BaseExceptionGroup( | ||||
|                         'tractor.ActorNursery errored with', | ||||
|                         tuple(errors.values()), | ||||
|                     ) | ||||
|                 else: | ||||
|                     raise list(errors.values())[0] | ||||
| 
 | ||||
|  | @ -447,7 +453,7 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|     # after nursery exit | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| @acm | ||||
| async def open_nursery( | ||||
|     **kwargs, | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue