Compare commits
	
		
			60 Commits 
		
	
	
		
			master
			...
			sigintsavi
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | fec2ba004c | |
|  | ba9c914221 | |
|  | 30ee3f2dcc | |
|  | 0b4fc4fc47 | |
|  | 6b8c193221 | |
|  | 05167bdc70 | |
|  | fa21083b51 | |
|  | e6ad7a117b | |
|  | 4366873582 | |
|  | 9e6a22e52e | |
|  | 0ab49cd244 | |
|  | 3fafa87ea9 | |
|  | a6f5b9396a | |
|  | 61af2dc5aa | |
|  | ba857fe85c | |
|  | cb221b9e7c | |
|  | 3bc4778950 | |
|  | 5ae21e4753 | |
|  | d4a36e57d1 | |
|  | 58956ae950 | |
|  | a864f1e729 | |
|  | a4bc5f79ad | |
|  | c132b7f624 | |
|  | b659326d5b | |
|  | d971e9bc9d | |
|  | 611120c67c | |
|  | 7f6cace40b | |
|  | fe4adbf6f0 | |
|  | 6ccfeb17d5 | |
|  | 9bed332a94 | |
|  | 13df959d90 | |
|  | d0074291a1 | |
|  | 8559ad69f3 | |
|  | e519df1bd2 | |
|  | 24fd87d969 | |
|  | 91054a8a42 | |
|  | cdc7bf6549 | |
|  | c865d01e85 | |
|  | e1caeeb8de | |
|  | 7c25aa176f | |
|  | 3b7985292f | |
|  | e8fc820b92 | |
|  | b2fdbc44d1 | |
|  | f7823a46b8 | |
|  | f76c809c39 | |
|  | 9e56881163 | |
|  | 8291ee09b3 | |
|  | 4a441f0988 | |
|  | df0108a0bb | |
|  | 8537e17251 | |
|  | 20acb50d94 | |
|  | eab895864f | |
|  | 65a9f69d6c | |
|  | 24b6cc0209 | |
|  | f488db6d8d | |
|  | c5d335c057 | |
|  | 4594fe3501 | |
|  | 5f0262fd98 | |
|  | 59e7f29eed | |
|  | e2dfd6e99d | 
|  | @ -74,7 +74,7 @@ jobs: | |||
|         run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager | ||||
| 
 | ||||
|       - name: Run tests | ||||
|         run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs | ||||
|         run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs -v | ||||
| 
 | ||||
|   # We skip 3.10 on windows for now due to | ||||
|   # https://github.com/pytest-dev/pytest/issues/8733 | ||||
|  | @ -111,4 +111,4 @@ jobs: | |||
|         run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager | ||||
| 
 | ||||
|       - name: Run tests | ||||
|         run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs | ||||
|         run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace | ||||
|  |  | |||
|  | @ -0,0 +1,40 @@ | |||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def just_sleep( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Start and sleep. | ||||
| 
 | ||||
|     ''' | ||||
|     await ctx.started() | ||||
|     await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| async def main() -> None: | ||||
| 
 | ||||
|     async with tractor.open_nursery( | ||||
|         debug_mode=True, | ||||
|     ) as n: | ||||
|         portal = await n.start_actor( | ||||
|             'ctx_child', | ||||
| 
 | ||||
|             # XXX: we don't enable the current module in order | ||||
|             # to trigger `ModuleNotFound`. | ||||
|             enable_modules=[], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             just_sleep,  # taken from pytest parameterization | ||||
|         ) as (ctx, sent): | ||||
|             raise KeyboardInterrupt | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  | @ -1,5 +1,6 @@ | |||
| pytest | ||||
| pytest-trio | ||||
| pytest-timeout | ||||
| pdbpp | ||||
| mypy<0.920 | ||||
| trio_typing<0.7.0 | ||||
|  |  | |||
							
								
								
									
										17
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										17
									
								
								setup.py
								
								
								
								
							|  | @ -43,7 +43,7 @@ setup( | |||
|     install_requires=[ | ||||
| 
 | ||||
|         # trio related | ||||
|         'trio>0.8', | ||||
|         'trio >= 0.20', | ||||
|         'async_generator', | ||||
|         'trio_typing', | ||||
| 
 | ||||
|  | @ -54,15 +54,22 @@ setup( | |||
|         # tooling | ||||
|         'colorlog', | ||||
|         'wrapt', | ||||
| 
 | ||||
|         # pip ref docs on these specs: | ||||
|         # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples | ||||
|         # and pep: | ||||
|         # https://peps.python.org/pep-0440/#version-specifiers | ||||
|         'pdbpp <= 0.10.1; python_version < "3.10"', | ||||
| 
 | ||||
|         # windows deps workaround for ``pdbpp`` | ||||
|         # https://github.com/pdbpp/pdbpp/issues/498 | ||||
|         # https://github.com/pdbpp/fancycompleter/issues/37 | ||||
|         'pyreadline3 ; platform_system == "Windows"', | ||||
| 
 | ||||
|         'pdbpp', | ||||
|         # 3.10 has an outstanding unreleased issue and `pdbpp` itself | ||||
|         # pins to patched forks of its own dependencies as well. | ||||
|         "pdbpp @ git+https://github.com/pdbpp/pdbpp@master#egg=pdbpp",  # noqa: E501 | ||||
|         #   pins to patched forks of its own dependencies as well..and | ||||
|         #   we need a specific patch on master atm. | ||||
|         'pdbpp @ git+https://github.com/pdbpp/pdbpp@76c4be5#egg=pdbpp ; python_version > "3.9"',  # noqa: E501 | ||||
| 
 | ||||
|         # serialization | ||||
|         'msgspec >= "0.4.0"' | ||||
|  | @ -87,8 +94,8 @@ setup( | |||
|         "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", | ||||
|         "Programming Language :: Python :: Implementation :: CPython", | ||||
|         "Programming Language :: Python :: 3 :: Only", | ||||
|         "Programming Language :: Python :: 3.10", | ||||
|         "Programming Language :: Python :: 3.9", | ||||
|         "Programming Language :: Python :: 3.10", | ||||
|         "Intended Audience :: Science/Research", | ||||
|         "Intended Audience :: Developers", | ||||
|         "Topic :: System :: Distributed Computing", | ||||
|  |  | |||
|  | @ -85,11 +85,14 @@ def spawn_backend(request): | |||
|     return request.config.option.spawn_backend | ||||
| 
 | ||||
| 
 | ||||
| _ci_env: bool = os.environ.get('CI', False) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture(scope='session') | ||||
| def ci_env() -> bool: | ||||
|     """Detect CI envoirment. | ||||
|     """ | ||||
|     return os.environ.get('TRAVIS', False) or os.environ.get('CI', False) | ||||
|     return _ci_env | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture(scope='session') | ||||
|  |  | |||
|  | @ -265,42 +265,44 @@ async def test_callee_closes_ctx_after_stream_open(): | |||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             close_ctx_immediately, | ||||
|         with trio.fail_after(2): | ||||
|             async with portal.open_context( | ||||
|                 close_ctx_immediately, | ||||
| 
 | ||||
|             # flag to avoid waiting the final result | ||||
|             # cancel_on_exit=True, | ||||
|                 # flag to avoid waiting the final result | ||||
|                 # cancel_on_exit=True, | ||||
| 
 | ||||
|         ) as (ctx, sent): | ||||
|             ) as (ctx, sent): | ||||
| 
 | ||||
|             assert sent is None | ||||
|                 assert sent is None | ||||
| 
 | ||||
|             with trio.fail_after(0.5): | ||||
|                 async with ctx.open_stream() as stream: | ||||
|                 with trio.fail_after(0.5): | ||||
|                     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                     # should fall through since ``StopAsyncIteration`` | ||||
|                     # should be raised through translation of | ||||
|                     # a ``trio.EndOfChannel`` by | ||||
|                     # ``trio.abc.ReceiveChannel.__anext__()`` | ||||
|                     async for _ in stream: | ||||
|                         assert 0 | ||||
|                     else: | ||||
|                         # should fall through since ``StopAsyncIteration`` | ||||
|                         # should be raised through translation of | ||||
|                         # a ``trio.EndOfChannel`` by | ||||
|                         # ``trio.abc.ReceiveChannel.__anext__()`` | ||||
|                         async for _ in stream: | ||||
|                             assert 0 | ||||
|                         else: | ||||
| 
 | ||||
|                         # verify stream is now closed | ||||
|                         try: | ||||
|                             await stream.receive() | ||||
|                         except trio.EndOfChannel: | ||||
|                             # verify stream is now closed | ||||
|                             try: | ||||
|                                 await stream.receive() | ||||
|                             except trio.EndOfChannel: | ||||
|                                 pass | ||||
| 
 | ||||
|                 # TODO: should be just raise the closed resource err | ||||
|                 # directly here to enforce not allowing a re-open | ||||
|                 # of a stream to the context (at least until a time of | ||||
|                 # if/when we decide that's a good idea?) | ||||
|                 try: | ||||
|                     with trio.fail_after(0.5): | ||||
|                         async with ctx.open_stream() as stream: | ||||
|                             pass | ||||
| 
 | ||||
|             # TODO: should be just raise the closed resource err | ||||
|             # directly here to enforce not allowing a re-open | ||||
|             # of a stream to the context (at least until a time of | ||||
|             # if/when we decide that's a good idea?) | ||||
|             try: | ||||
|                 async with ctx.open_stream() as stream: | ||||
|                 except trio.ClosedResourceError: | ||||
|                     pass | ||||
|             except trio.ClosedResourceError: | ||||
|                 pass | ||||
| 
 | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,5 +1,5 @@ | |||
| """ | ||||
| That native debug better work! | ||||
| That "native" debug mode better work! | ||||
| 
 | ||||
| All these tests can be understood (somewhat) by running the equivalent | ||||
| `examples/debugging/` scripts manually. | ||||
|  | @ -10,16 +10,21 @@ TODO: | |||
|     - wonder if any of it'll work on OS X? | ||||
| 
 | ||||
| """ | ||||
| import time | ||||
| from os import path | ||||
| from typing import Optional | ||||
| import platform | ||||
| import sys | ||||
| import time | ||||
| 
 | ||||
| import pytest | ||||
| import pexpect | ||||
| from pexpect.exceptions import ( | ||||
|     TIMEOUT, | ||||
|     EOF, | ||||
| ) | ||||
| 
 | ||||
| from conftest import repodir | ||||
| 
 | ||||
| 
 | ||||
| # TODO: The next great debugger audit could be done by you! | ||||
| # - recurrent entry to breakpoint() from single actor *after* and an | ||||
| #   error in another task? | ||||
|  | @ -73,6 +78,52 @@ def spawn( | |||
|     return _spawn | ||||
| 
 | ||||
| 
 | ||||
| def assert_before( | ||||
|     child, | ||||
|     patts: list[str], | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
| 
 | ||||
|     for patt in patts: | ||||
|         try: | ||||
|             assert patt in before | ||||
|         except AssertionError: | ||||
|             print(before) | ||||
|             raise | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture( | ||||
|     params=[False, True], | ||||
|     ids='ctl-c={}'.format, | ||||
| ) | ||||
| def ctlc(request) -> bool: | ||||
| 
 | ||||
|     use_ctlc = request.param | ||||
| 
 | ||||
|     if ( | ||||
|         sys.version_info <= (3, 10) | ||||
|         and use_ctlc | ||||
|     ): | ||||
|         # on 3.9 it seems the REPL UX | ||||
|         # is highly unreliable and frankly annoying | ||||
|         # to test for. It does work from manual testing | ||||
|         # but i just don't think it's wroth it to try | ||||
|         # and get this working especially since we want to | ||||
|         # be 3.10+ mega-asap. | ||||
|         pytest.skip('Py3.9 and `pdbpp` son no bueno..') | ||||
| 
 | ||||
|     if use_ctlc: | ||||
|         # XXX: disable pygments highlighting for auto-tests | ||||
|         # since some envs (like actions CI) will struggle | ||||
|         # the the added color-char encoding.. | ||||
|         from tractor._debug import TractorConfig | ||||
|         TractorConfig.use_pygements = False | ||||
| 
 | ||||
|     yield use_ctlc | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'user_in_out', | ||||
|     [ | ||||
|  | @ -137,20 +188,67 @@ def test_root_actor_bp(spawn, user_in_out): | |||
|         assert expect_err_str in str(child.before) | ||||
| 
 | ||||
| 
 | ||||
| def test_root_actor_bp_forever(spawn): | ||||
| def do_ctlc( | ||||
|     child, | ||||
|     count: int = 3, | ||||
|     delay: float = 0.1, | ||||
|     patt: Optional[str] = None, | ||||
| 
 | ||||
|     # XXX: literally no idea why this is an issue in CI but likely will | ||||
|     # flush out (hopefully) with proper 3.10 release of `pdbpp`... | ||||
|     expect_prompt: bool = True, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     # make sure ctl-c sends don't do anything but repeat output | ||||
|     for _ in range(count): | ||||
|         time.sleep(delay) | ||||
|         child.sendcontrol('c') | ||||
| 
 | ||||
|         # TODO: figure out why this makes CI fail.. | ||||
|         # if you run this test manually it works just fine.. | ||||
|         from conftest import _ci_env | ||||
|         if expect_prompt and not _ci_env: | ||||
|             before = str(child.before.decode()) | ||||
|             time.sleep(delay) | ||||
|             child.expect(r"\(Pdb\+\+\)") | ||||
|             time.sleep(delay) | ||||
| 
 | ||||
|             if patt: | ||||
|                 # should see the last line on console | ||||
|                 assert patt in before | ||||
| 
 | ||||
| 
 | ||||
| def test_root_actor_bp_forever( | ||||
|     spawn, | ||||
|     ctlc: bool, | ||||
| ): | ||||
|     "Re-enter a breakpoint from the root actor-task." | ||||
|     child = spawn('root_actor_breakpoint_forever') | ||||
| 
 | ||||
|     # do some "next" commands to demonstrate recurrent breakpoint | ||||
|     # entries | ||||
|     for _ in range(10): | ||||
|         child.sendline('next') | ||||
| 
 | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # do one continue which should trigger a new task to lock the tty | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|         child.sendline('next') | ||||
| 
 | ||||
|     # do one continue which should trigger a | ||||
|     # new task to lock the tty | ||||
|     child.sendline('continue') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # seems that if we hit ctrl-c too fast the | ||||
|     # sigint guard machinery might not kick in.. | ||||
|     time.sleep(0.001) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # XXX: this previously caused a bug! | ||||
|     child.sendline('n') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|  | @ -158,8 +256,21 @@ def test_root_actor_bp_forever(spawn): | |||
|     child.sendline('n') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # quit out of the loop | ||||
|     child.sendline('q') | ||||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| def test_subactor_error(spawn): | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'do_next', | ||||
|     (True, False), | ||||
|     ids='do_next={}'.format, | ||||
| ) | ||||
| def test_subactor_error( | ||||
|     spawn, | ||||
|     ctlc: bool, | ||||
|     do_next: bool, | ||||
| ): | ||||
|     "Single subactor raising an error" | ||||
| 
 | ||||
|     child = spawn('subactor_error') | ||||
|  | @ -170,23 +281,33 @@ def test_subactor_error(spawn): | |||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||
| 
 | ||||
|     # send user command | ||||
|     # (in this case it's the same for 'continue' vs. 'quit') | ||||
|     child.sendline('continue') | ||||
|     if do_next: | ||||
|         child.sendline('n') | ||||
| 
 | ||||
|     # the debugger should enter a second time in the nursery | ||||
|     # creating actor | ||||
|     else: | ||||
|         # make sure ctl-c sends don't do anything but repeat output | ||||
|         if ctlc: | ||||
|             do_ctlc( | ||||
|                 child, | ||||
|             ) | ||||
| 
 | ||||
|         # send user command and (in this case it's the same for 'continue' | ||||
|         # vs. 'quit') the debugger should enter a second time in the nursery | ||||
|         # creating actor | ||||
|         child.sendline('continue') | ||||
| 
 | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
| 
 | ||||
|     # root actor gets debugger engaged | ||||
|     assert "Attaching to pdb in crashed actor: ('root'" in before | ||||
| 
 | ||||
|     # error is a remote error propagated from the subactor | ||||
|     assert "RemoteActorError: ('name_error'" in before | ||||
| 
 | ||||
|     # another round | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect('\r\n') | ||||
| 
 | ||||
|  | @ -194,7 +315,10 @@ def test_subactor_error(spawn): | |||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| def test_subactor_breakpoint(spawn): | ||||
| def test_subactor_breakpoint( | ||||
|     spawn, | ||||
|     ctlc: bool, | ||||
| ): | ||||
|     "Single subactor with an infinite breakpoint loop" | ||||
| 
 | ||||
|     child = spawn('subactor_breakpoint') | ||||
|  | @ -211,6 +335,9 @@ def test_subactor_breakpoint(spawn): | |||
|         child.sendline('next') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|     # now run some "continues" to show re-entries | ||||
|     for _ in range(5): | ||||
|         child.sendline('continue') | ||||
|  | @ -218,6 +345,9 @@ def test_subactor_breakpoint(spawn): | |||
|         before = str(child.before.decode()) | ||||
|         assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|     # finally quit the loop | ||||
|     child.sendline('q') | ||||
| 
 | ||||
|  | @ -228,6 +358,9 @@ def test_subactor_breakpoint(spawn): | |||
|     assert "RemoteActorError: ('breakpoint_forever'" in before | ||||
|     assert 'bdb.BdbQuit' in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # quit the parent | ||||
|     child.sendline('c') | ||||
| 
 | ||||
|  | @ -239,11 +372,15 @@ def test_subactor_breakpoint(spawn): | |||
|     assert 'bdb.BdbQuit' in before | ||||
| 
 | ||||
| 
 | ||||
| def test_multi_subactors(spawn): | ||||
|     """ | ||||
|     Multiple subactors, both erroring and breakpointing as well as | ||||
|     a nested subactor erroring. | ||||
|     """ | ||||
| def test_multi_subactors( | ||||
|     spawn, | ||||
|     ctlc: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Multiple subactors, both erroring and | ||||
|     breakpointing as well as a nested subactor erroring. | ||||
| 
 | ||||
|     ''' | ||||
|     child = spawn(r'multi_subactors') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|  | @ -252,12 +389,18 @@ def test_multi_subactors(spawn): | |||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # do some "next" commands to demonstrate recurrent breakpoint | ||||
|     # entries | ||||
|     for _ in range(10): | ||||
|         child.sendline('next') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|     # continue to next error | ||||
|     child.sendline('c') | ||||
| 
 | ||||
|  | @ -267,14 +410,28 @@ def test_multi_subactors(spawn): | |||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||
|     assert "NameError" in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # continue again | ||||
|     child.sendline('c') | ||||
| 
 | ||||
|     # 2nd name_error failure | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('name_error_1'" in before | ||||
|     assert "NameError" in before | ||||
| 
 | ||||
|     # XXX: lol honestly no idea why CI is cuck but | ||||
|     # seems like this likely falls into our unhandled nested | ||||
|     # case and isn't working in that env due to raciness.. | ||||
|     from conftest import _ci_env | ||||
|     if not ctlc and _ci_env: | ||||
|         name = 'name_error' if ctlc else 'name_error_1' | ||||
|         assert_before(child, [ | ||||
|             f"Attaching to pdb in crashed actor: ('{name}'", | ||||
|             "NameError", | ||||
|         ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # breakpoint loop should re-engage | ||||
|     child.sendline('c') | ||||
|  | @ -282,20 +439,30 @@ def test_multi_subactors(spawn): | |||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # wait for spawn error to show up | ||||
|     spawn_err = "Attaching to pdb in crashed actor: ('spawn_error'" | ||||
|     while spawn_err not in before: | ||||
|     start = time.time() | ||||
|     while ( | ||||
|         spawn_err not in before | ||||
|         and (time.time() - start) < 3 | ||||
|     ): | ||||
|         child.sendline('c') | ||||
|         time.sleep(0.1) | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         before = str(child.before.decode()) | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|     # 2nd depth nursery should trigger | ||||
|     # child.sendline('c') | ||||
|     # child.expect(r"\(Pdb\+\+\)") | ||||
|     # before = str(child.before.decode()) | ||||
|     assert spawn_err in before | ||||
|     assert "RemoteActorError: ('name_error_1'" in before | ||||
|     if not ctlc: | ||||
|         assert_before(child, [ | ||||
|             spawn_err, | ||||
|             "RemoteActorError: ('name_error_1'", | ||||
|         ]) | ||||
| 
 | ||||
|     # now run some "continues" to show re-entries | ||||
|     for _ in range(5): | ||||
|  | @ -304,33 +471,62 @@ def test_multi_subactors(spawn): | |||
| 
 | ||||
|     # quit the loop and expect parent to attach | ||||
|     child.sendline('q') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     before = str(child.before.decode()) | ||||
|     # debugger attaches to root | ||||
|     assert "Attaching to pdb in crashed actor: ('root'" in before | ||||
|     # expect a multierror with exceptions for each sub-actor | ||||
|     assert "RemoteActorError: ('breakpoint_forever'" in before | ||||
|     assert "RemoteActorError: ('name_error'" in before | ||||
|     assert "RemoteActorError: ('spawn_error'" in before | ||||
|     assert "RemoteActorError: ('name_error_1'" in before | ||||
|     assert 'bdb.BdbQuit' in before | ||||
| 
 | ||||
|     try: | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|     except TIMEOUT: | ||||
|         if _ci_env and not ctlc: | ||||
|             raise | ||||
| 
 | ||||
|         # in ci seems like this can sometimes just result | ||||
|         # in full tree death? | ||||
|         print('tree died?') | ||||
| 
 | ||||
|     else: | ||||
|         before = str(child.before.decode()) | ||||
|         assert_before(child, [ | ||||
|             # debugger attaches to root | ||||
|             "Attaching to pdb in crashed actor: ('root'", | ||||
| 
 | ||||
|             # expect a multierror with exceptions for each sub-actor | ||||
|             "RemoteActorError: ('breakpoint_forever'", | ||||
|             "RemoteActorError: ('name_error'", | ||||
|             "RemoteActorError: ('spawn_error'", | ||||
|             "RemoteActorError: ('name_error_1'", | ||||
|             'bdb.BdbQuit', | ||||
|         ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # process should exit | ||||
|     child.sendline('c') | ||||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
|     try: | ||||
|         child.expect(pexpect.EOF) | ||||
|     except TIMEOUT: | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|     # repeat of previous multierror for final output | ||||
|     before = str(child.before.decode()) | ||||
|     assert "RemoteActorError: ('breakpoint_forever'" in before | ||||
|     assert "RemoteActorError: ('name_error'" in before | ||||
|     assert "RemoteActorError: ('spawn_error'" in before | ||||
|     assert "RemoteActorError: ('name_error_1'" in before | ||||
|     assert 'bdb.BdbQuit' in before | ||||
|     assert_before(child, [ | ||||
|         "RemoteActorError: ('breakpoint_forever'", | ||||
|         "RemoteActorError: ('name_error'", | ||||
|         "RemoteActorError: ('spawn_error'", | ||||
|         "RemoteActorError: ('name_error_1'", | ||||
|         'bdb.BdbQuit', | ||||
|     ]) | ||||
| 
 | ||||
| 
 | ||||
| def test_multi_daemon_subactors(spawn, loglevel): | ||||
|     """Multiple daemon subactors, both erroring and breakpointing within a | ||||
| def test_multi_daemon_subactors( | ||||
|     spawn, | ||||
|     loglevel: str, | ||||
|     ctlc: bool | ||||
| ): | ||||
|     ''' | ||||
|     Multiple daemon subactors, both erroring and breakpointing within a | ||||
|     stream. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     child = spawn('multi_daemon_subactors') | ||||
| 
 | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|  | @ -352,6 +548,9 @@ def test_multi_daemon_subactors(spawn, loglevel): | |||
|     else: | ||||
|         raise ValueError("Neither log msg was found !?") | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # NOTE: previously since we did not have clobber prevention | ||||
|     # in the root actor this final resume could result in the debugger | ||||
|     # tearing down since both child actors would be cancelled and it was | ||||
|  | @ -371,7 +570,7 @@ def test_multi_daemon_subactors(spawn, loglevel): | |||
|     # now the root actor won't clobber the bp_forever child | ||||
|     # during it's first access to the debug lock, but will instead | ||||
|     # wait for the lock to release, by the edge triggered | ||||
|     # ``_debug._no_remote_has_tty`` event before sending cancel messages | ||||
|     # ``_debug.Lock.no_remote_has_tty`` event before sending cancel messages | ||||
|     # (via portals) to its underlings B) | ||||
| 
 | ||||
|     # at some point here there should have been some warning msg from | ||||
|  | @ -379,6 +578,9 @@ def test_multi_daemon_subactors(spawn, loglevel): | |||
|     # it seems unreliable in testing here to gnab it: | ||||
|     # assert "in use by child ('bp_forever'," in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # wait for final error in root | ||||
|     while True: | ||||
| 
 | ||||
|  | @ -394,17 +596,23 @@ def test_multi_daemon_subactors(spawn, loglevel): | |||
|         except AssertionError: | ||||
|             assert bp_forever_msg in before | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|     try: | ||||
|         child.sendline('c') | ||||
|         child.expect(pexpect.EOF) | ||||
| 
 | ||||
|     except pexpect.exceptions.TIMEOUT: | ||||
|     except TIMEOUT: | ||||
|         # Failed to exit using continue..? | ||||
|         child.sendline('q') | ||||
|         child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| def test_multi_subactors_root_errors(spawn): | ||||
| def test_multi_subactors_root_errors( | ||||
|     spawn, | ||||
|     ctlc: bool | ||||
| ): | ||||
|     ''' | ||||
|     Multiple subactors, both erroring and breakpointing as well as | ||||
|     a nested subactor erroring. | ||||
|  | @ -419,33 +627,58 @@ def test_multi_subactors_root_errors(spawn): | |||
|     before = str(child.before.decode()) | ||||
|     assert "NameError: name 'doggypants' is not defined" in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # continue again to catch 2nd name error from | ||||
|     # actor 'name_error_1' (which is 2nd depth). | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('name_error_1'" in before | ||||
|     assert "NameError" in before | ||||
|     try: | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|     except TIMEOUT: | ||||
|         child.sendline('') | ||||
| 
 | ||||
|     # XXX: lol honestly no idea why CI is cuck but | ||||
|     # seems like this likely falls into our unhandled nested | ||||
|     # case and isn't working in that env due to raciness.. | ||||
|     from conftest import _ci_env | ||||
|     if not ctlc and _ci_env: | ||||
|         name = 'name_error' if ctlc else 'name_error_1' | ||||
|         assert_before(child, [ | ||||
|             f"Attaching to pdb in crashed actor: ('{name}'", | ||||
|             "NameError", | ||||
|         ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('spawn_error'" in before | ||||
|     # boxed error from previous step | ||||
|     assert "RemoteActorError: ('name_error_1'" in before | ||||
|     assert "NameError" in before | ||||
|     assert_before(child, [ | ||||
|         "Attaching to pdb in crashed actor: ('spawn_error'", | ||||
|         # boxed error from previous step | ||||
|         "RemoteActorError: ('name_error_1'", | ||||
|         "NameError", | ||||
|     ]) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('root'" in before | ||||
|     # boxed error from first level failure | ||||
|     assert "RemoteActorError: ('name_error'" in before | ||||
|     assert "NameError" in before | ||||
|     assert_before(child, [ | ||||
|         "Attaching to pdb in crashed actor: ('root'", | ||||
|         # boxed error from previous step | ||||
|         "RemoteActorError: ('name_error'", | ||||
|         "NameError", | ||||
|     ]) | ||||
| 
 | ||||
|     # warnings assert we probably don't need | ||||
|     # assert "Cancelling nursery in ('spawn_error'," in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     # continue again | ||||
|     child.sendline('c') | ||||
|     child.expect(pexpect.EOF) | ||||
|  | @ -455,7 +688,13 @@ def test_multi_subactors_root_errors(spawn): | |||
|     assert "AssertionError" in before | ||||
| 
 | ||||
| 
 | ||||
| def test_multi_nested_subactors_error_through_nurseries(spawn): | ||||
| def test_multi_nested_subactors_error_through_nurseries( | ||||
|     spawn, | ||||
| 
 | ||||
|     # TODO: address debugger issue for nested tree: | ||||
|     # <issuelink> | ||||
|     # ctlc: bool, | ||||
| ): | ||||
|     """Verify deeply nested actors that error trigger debugger entries | ||||
|     at each actor nurserly (level) all the way up the tree. | ||||
| 
 | ||||
|  | @ -476,7 +715,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn): | |||
|             child.sendline('c') | ||||
|             time.sleep(0.1) | ||||
| 
 | ||||
|         except pexpect.exceptions.EOF: | ||||
|         except EOF: | ||||
| 
 | ||||
|             # race conditions on how fast the continue is sent? | ||||
|             print(f"Failed early on {i}?") | ||||
|  | @ -490,9 +729,11 @@ def test_multi_nested_subactors_error_through_nurseries(spawn): | |||
|         assert "NameError" in before | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.timeout(15) | ||||
| def test_root_nursery_cancels_before_child_releases_tty_lock( | ||||
|     spawn, | ||||
|     start_method | ||||
|     start_method, | ||||
|     ctlc: bool, | ||||
| ): | ||||
|     """Test that when the root sends a cancel message before a nested | ||||
|     child has unblocked (which can happen when it has the tty lock and | ||||
|  | @ -509,6 +750,9 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | |||
|     assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before | ||||
|     time.sleep(0.5) | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
| 
 | ||||
|     for i in range(4): | ||||
|  | @ -517,8 +761,8 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | |||
|             child.expect(r"\(Pdb\+\+\)") | ||||
| 
 | ||||
|         except ( | ||||
|             pexpect.exceptions.EOF, | ||||
|             pexpect.exceptions.TIMEOUT, | ||||
|             EOF, | ||||
|             TIMEOUT, | ||||
|         ): | ||||
|             # races all over.. | ||||
| 
 | ||||
|  | @ -533,15 +777,23 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | |||
|         before = str(child.before.decode()) | ||||
|         assert "NameError: name 'doggypants' is not defined" in before | ||||
| 
 | ||||
|         child.sendline('c') | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
| 
 | ||||
|     while True: | ||||
|         child.sendline('c') | ||||
|         time.sleep(0.1) | ||||
| 
 | ||||
|     for i in range(3): | ||||
|         try: | ||||
|             child.expect(pexpect.EOF) | ||||
|             break | ||||
|         except pexpect.exceptions.TIMEOUT: | ||||
|         except TIMEOUT: | ||||
|             child.sendline('c') | ||||
|             time.sleep(0.1) | ||||
|             print('child was able to grab tty lock again?') | ||||
|     else: | ||||
|         child.sendline('q') | ||||
|         child.expect(pexpect.EOF) | ||||
| 
 | ||||
|     if not timed_out_early: | ||||
| 
 | ||||
|  | @ -553,6 +805,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | |||
| 
 | ||||
| def test_root_cancels_child_context_during_startup( | ||||
|     spawn, | ||||
|     ctlc: bool, | ||||
| ): | ||||
|     '''Verify a fast fail in the root doesn't lock up the child reaping | ||||
|     and all while using the new context api. | ||||
|  | @ -565,12 +818,16 @@ def test_root_cancels_child_context_during_startup( | |||
|     before = str(child.before.decode()) | ||||
|     assert "AssertionError" in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
| 
 | ||||
| def test_different_debug_mode_per_actor( | ||||
|     spawn, | ||||
|     ctlc: bool, | ||||
| ): | ||||
|     child = spawn('per_actor_debug') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|  | @ -580,6 +837,9 @@ def test_different_debug_mode_per_actor( | |||
|     assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before | ||||
|     assert "RuntimeError" in before | ||||
| 
 | ||||
|     if ctlc: | ||||
|         do_ctlc(child) | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(pexpect.EOF) | ||||
| 
 | ||||
|  |  | |||
|  | @ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): | |||
|     'example_script', | ||||
| 
 | ||||
|     # walk yields: (dirpath, dirnames, filenames) | ||||
|     [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] | ||||
|     [ | ||||
|         (p[0], f) for p in os.walk(examples_dir()) for f in p[2] | ||||
| 
 | ||||
|         if '__' not in f | ||||
|         and f[0] != '_' | ||||
|         and 'debugging' not in p[0]], | ||||
|         and 'debugging' not in p[0] | ||||
|         and 'integration' not in p[0] | ||||
|     ], | ||||
| 
 | ||||
|     ids=lambda t: t[1], | ||||
| ) | ||||
|  | @ -113,9 +116,19 @@ def test_example(run_example_in_subproc, example_script): | |||
|             # print(f'STDOUT: {out}') | ||||
| 
 | ||||
|             # if we get some gnarly output let's aggregate and raise | ||||
|             errmsg = err.decode() | ||||
|             errlines = errmsg.splitlines() | ||||
|             if err and 'Error' in errlines[-1]: | ||||
|                 raise Exception(errmsg) | ||||
|             if err: | ||||
|                 errmsg = err.decode() | ||||
|                 errlines = errmsg.splitlines() | ||||
|                 last_error = errlines[-1] | ||||
|                 if ( | ||||
|                     'Error' in last_error | ||||
| 
 | ||||
|                     # XXX: currently we print this to console, but maybe | ||||
|                     # shouldn't eventually once we figure out what's | ||||
|                     # a better way to be explicit about aio side | ||||
|                     # cancels? | ||||
|                     and 'asyncio.exceptions.CancelledError' not in last_error | ||||
|                 ): | ||||
|                     raise Exception(errmsg) | ||||
| 
 | ||||
|             assert proc.returncode == 0 | ||||
|  |  | |||
|  | @ -150,13 +150,13 @@ def test_loglevel_propagated_to_subactor( | |||
|     async def main(): | ||||
|         async with tractor.open_nursery( | ||||
|             name='arbiter', | ||||
|             loglevel=level, | ||||
|             start_method=start_method, | ||||
|             arbiter_addr=arb_addr, | ||||
| 
 | ||||
|         ) as tn: | ||||
|             await tn.run_in_actor( | ||||
|                 check_loglevel, | ||||
|                 loglevel=level, | ||||
|                 level=level, | ||||
|             ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -87,9 +87,10 @@ async def _invoke( | |||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__ = True | ||||
|     treat_as_gen = False | ||||
|     treat_as_gen: bool = False | ||||
|     failed_resp: bool = False | ||||
| 
 | ||||
|     # possible a traceback (not sure what typing is for this..) | ||||
|     # possibly a traceback (not sure what typing is for this..) | ||||
|     tb = None | ||||
| 
 | ||||
|     cancel_scope = trio.CancelScope() | ||||
|  | @ -190,7 +191,8 @@ async def _invoke( | |||
|                     ctx._scope_nursery = scope_nursery | ||||
|                     cs = scope_nursery.cancel_scope | ||||
|                     task_status.started(cs) | ||||
|                     await chan.send({'return': await coro, 'cid': cid}) | ||||
|                     res = await coro | ||||
|                     await chan.send({'return': res, 'cid': cid}) | ||||
| 
 | ||||
|             except trio.MultiError: | ||||
|                 # if a context error was set then likely | ||||
|  | @ -204,7 +206,12 @@ async def _invoke( | |||
|                 # XXX: only pop the context tracking if | ||||
|                 # a ``@tractor.context`` entrypoint was called | ||||
|                 assert chan.uid | ||||
| 
 | ||||
|                 # don't pop the local context until we know the | ||||
|                 # associated child isn't in debug any more | ||||
|                 await _debug.maybe_wait_for_debugger() | ||||
|                 ctx = actor._contexts.pop((chan.uid, cid)) | ||||
| 
 | ||||
|                 if ctx: | ||||
|                     log.runtime( | ||||
|                         f'Context entrypoint {func} was terminated:\n{ctx}' | ||||
|  | @ -235,10 +242,24 @@ async def _invoke( | |||
| 
 | ||||
|         else: | ||||
|             # regular async function | ||||
|             await chan.send({'functype': 'asyncfunc', 'cid': cid}) | ||||
|             try: | ||||
|                 await chan.send({'functype': 'asyncfunc', 'cid': cid}) | ||||
|             except trio.BrokenResourceError: | ||||
|                 failed_resp = True | ||||
|                 if is_rpc: | ||||
|                     raise | ||||
|                 else: | ||||
|                     log.warning( | ||||
|                         f'Failed to respond to non-rpc request: {func}' | ||||
|                     ) | ||||
| 
 | ||||
|             with cancel_scope as cs: | ||||
|                 task_status.started(cs) | ||||
|                 await chan.send({'return': await coro, 'cid': cid}) | ||||
|                 result = await coro | ||||
|                 log.cancel(f'result: {result}') | ||||
|                 if not failed_resp: | ||||
|                     # only send result if we know IPC isn't down | ||||
|                     await chan.send({'return': result, 'cid': cid}) | ||||
| 
 | ||||
|     except ( | ||||
|         Exception, | ||||
|  | @ -283,6 +304,7 @@ async def _invoke( | |||
|         except ( | ||||
|             trio.ClosedResourceError, | ||||
|             trio.BrokenResourceError, | ||||
|             BrokenPipeError, | ||||
|         ): | ||||
|             # if we can't propagate the error that's a big boo boo | ||||
|             log.error( | ||||
|  | @ -509,13 +531,20 @@ class Actor: | |||
|             mne = ModuleNotExposed(*err.args) | ||||
| 
 | ||||
|             if ns == '__main__': | ||||
|                 msg = ( | ||||
|                     "\n\nMake sure you exposed the current module using:\n\n" | ||||
|                     "ActorNursery.start_actor(<name>, enable_modules=" | ||||
|                     "[__name__])" | ||||
|                 ) | ||||
|                 modpath = '__name__' | ||||
|             else: | ||||
|                 modpath = f"'{ns}'" | ||||
| 
 | ||||
|                 mne.msg += msg | ||||
|             msg = ( | ||||
|                 "\n\nMake sure you exposed the target module, `{ns}`, " | ||||
|                 "using:\n" | ||||
|                 "ActorNursery.start_actor(<name>, enable_modules=[{mod}])" | ||||
|             ).format( | ||||
|                 ns=ns, | ||||
|                 mod=modpath, | ||||
|             ) | ||||
| 
 | ||||
|             mne.msg += msg | ||||
| 
 | ||||
|             raise mne | ||||
| 
 | ||||
|  | @ -603,17 +632,6 @@ class Actor: | |||
|             if ( | ||||
|                 local_nursery | ||||
|             ): | ||||
|                 if disconnected: | ||||
|                     # if the transport died and this actor is still | ||||
|                     # registered within a local nursery, we report that the | ||||
|                     # IPC layer may have failed unexpectedly since it may be | ||||
|                     # the cause of other downstream errors. | ||||
|                     entry = local_nursery._children.get(uid) | ||||
|                     if entry: | ||||
|                         _, proc, _ = entry | ||||
|                         log.warning(f'Actor {uid}@{proc} IPC connection broke!?') | ||||
|                         # if proc.poll() is not None: | ||||
|                         #     log.error('Actor {uid} proc died and IPC broke?') | ||||
| 
 | ||||
|                 log.cancel(f"Waiting on cancel request to peer {chan.uid}") | ||||
|                 # XXX: this is a soft wait on the channel (and its | ||||
|  | @ -630,6 +648,11 @@ class Actor: | |||
|                     # Attempt to wait for the far end to close the channel | ||||
|                     # and bail after timeout (2-generals on closure). | ||||
|                     assert chan.msgstream | ||||
| 
 | ||||
|                     log.runtime( | ||||
|                         f'Draining lingering msgs from stream {chan.msgstream}' | ||||
|                     ) | ||||
| 
 | ||||
|                     async for msg in chan.msgstream.drain(): | ||||
|                         # try to deliver any lingering msgs | ||||
|                         # before we destroy the channel. | ||||
|  | @ -646,7 +669,20 @@ class Actor: | |||
| 
 | ||||
|                     await local_nursery.exited.wait() | ||||
| 
 | ||||
|                 # if local_nursery._children | ||||
|                 if disconnected: | ||||
|                     # if the transport died and this actor is still | ||||
|                     # registered within a local nursery, we report that the | ||||
|                     # IPC layer may have failed unexpectedly since it may be | ||||
|                     # the cause of other downstream errors. | ||||
|                     entry = local_nursery._children.get(uid) | ||||
|                     if entry: | ||||
|                         _, proc, _ = entry | ||||
| 
 | ||||
|                         poll = getattr(proc, 'poll', None) | ||||
|                         if poll and poll() is None: | ||||
|                             log.cancel( | ||||
|                                 f'Actor {uid} IPC broke but proc is alive?' | ||||
|                             ) | ||||
| 
 | ||||
|             # ``Channel`` teardown and closure sequence | ||||
| 
 | ||||
|  | @ -688,7 +724,7 @@ class Actor: | |||
|                     # await chan.aclose() | ||||
| 
 | ||||
|                 except trio.BrokenResourceError: | ||||
|                     log.warning(f"Channel for {chan.uid} was already closed") | ||||
|                     log.runtime(f"Channel {chan.uid} was already closed") | ||||
| 
 | ||||
|     async def _push_result( | ||||
|         self, | ||||
|  | @ -919,17 +955,19 @@ class Actor: | |||
|                         chan._exc = exc | ||||
|                         raise exc | ||||
| 
 | ||||
|                     log.runtime( | ||||
|                     log.info( | ||||
|                         f"Processing request from {actorid}\n" | ||||
|                         f"{ns}.{funcname}({kwargs})") | ||||
| 
 | ||||
|                     if ns == 'self': | ||||
|                         func = getattr(self, funcname) | ||||
| 
 | ||||
|                         if funcname == 'cancel': | ||||
| 
 | ||||
|                             # don't start entire actor runtime | ||||
|                             # cancellation if this actor is in debug | ||||
|                             # mode | ||||
|                             pdb_complete = _debug._local_pdb_complete | ||||
|                             pdb_complete = _debug.Lock.local_pdb_complete | ||||
|                             if pdb_complete: | ||||
|                                 await pdb_complete.wait() | ||||
| 
 | ||||
|  | @ -960,12 +998,17 @@ class Actor: | |||
|                                 # ``_async_main()`` | ||||
|                                 kwargs['chan'] = chan | ||||
|                                 log.cancel( | ||||
|                                     f'{self.uid} was remotely cancelled by\n' | ||||
|                                     f'{chan.uid}!' | ||||
|                                 ) | ||||
|                                 await _invoke( | ||||
|                                     self, cid, chan, func, kwargs, is_rpc=False | ||||
|                                     f'Remote request to cancel task\n' | ||||
|                                     f'remote actor: {chan.uid}\n' | ||||
|                                     f'task: {cid}' | ||||
|                                 ) | ||||
|                                 try: | ||||
|                                     await _invoke( | ||||
|                                         self, cid, chan, func, kwargs, is_rpc=False | ||||
|                                     ) | ||||
|                                 except BaseException: | ||||
|                                     log.exception("failed to cancel task?") | ||||
| 
 | ||||
|                                 continue | ||||
|                     else: | ||||
|                         # complain to client about restricted modules | ||||
|  | @ -1370,7 +1413,7 @@ class Actor: | |||
| 
 | ||||
|             # kill any debugger request task to avoid deadlock | ||||
|             # with the root actor in this tree | ||||
|             dbcs = _debug._debugger_request_cs | ||||
|             dbcs = _debug.Lock._debugger_request_cs | ||||
|             if dbcs is not None: | ||||
|                 log.cancel("Cancelling active debugger request") | ||||
|                 dbcs.cancel() | ||||
|  | @ -1403,12 +1446,14 @@ class Actor: | |||
|     #             n.cancel_scope.cancel() | ||||
| 
 | ||||
|     async def _cancel_task(self, cid, chan): | ||||
|         """Cancel a local task by call-id / channel. | ||||
|         ''' | ||||
|         Cancel a local task by call-id / channel. | ||||
| 
 | ||||
|         Note this method will be treated as a streaming function | ||||
|         by remote actor-callers due to the declaration of ``ctx`` | ||||
|         in the signature (for now). | ||||
|         """ | ||||
| 
 | ||||
|         ''' | ||||
|         # right now this is only implicitly called by | ||||
|         # streaming IPC but it should be called | ||||
|         # to cancel any remotely spawned task | ||||
|  |  | |||
|  | @ -41,6 +41,7 @@ from .log import get_logger | |||
| from ._discovery import get_root | ||||
| from ._state import is_root_process, debug_mode | ||||
| from ._exceptions import is_multi_cancelled | ||||
| from ._ipc import Channel | ||||
| 
 | ||||
| 
 | ||||
| try: | ||||
|  | @ -59,31 +60,87 @@ log = get_logger(__name__) | |||
| __all__ = ['breakpoint', 'post_mortem'] | ||||
| 
 | ||||
| 
 | ||||
| # TODO: wrap all these in a static global class: ``DebugLock`` maybe? | ||||
| class Lock: | ||||
|     ''' | ||||
|     Actor global debug lock state. | ||||
| 
 | ||||
| # placeholder for function to set a ``trio.Event`` on debugger exit | ||||
| _pdb_release_hook: Optional[Callable] = None | ||||
|     Mostly to avoid a lot of ``global`` declarations for now XD. | ||||
| 
 | ||||
| # actor-wide variable pointing to current task name using debugger | ||||
| _local_task_in_debug: Optional[str] = None | ||||
|     ''' | ||||
|     # placeholder for function to set a ``trio.Event`` on debugger exit | ||||
|     pdb_release_hook: Optional[Callable] = None | ||||
| 
 | ||||
| # actor tree-wide actor uid that supposedly has the tty lock | ||||
| _global_actor_in_debug: Optional[Tuple[str, str]] = None | ||||
|     # actor-wide variable pointing to current task name using debugger | ||||
|     local_task_in_debug: Optional[str] = None | ||||
| 
 | ||||
| # lock in root actor preventing multi-access to local tty | ||||
| _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() | ||||
| _local_pdb_complete: Optional[trio.Event] = None | ||||
| _no_remote_has_tty: Optional[trio.Event] = None | ||||
|     # actor tree-wide actor uid that supposedly has the tty lock | ||||
|     global_actor_in_debug: Optional[Tuple[str, str]] = None | ||||
| 
 | ||||
| # XXX: set by the current task waiting on the root tty lock | ||||
| # and must be cancelled if this actor is cancelled via message | ||||
| # otherwise deadlocks with the parent actor may ensure | ||||
| _debugger_request_cs: Optional[trio.CancelScope] = None | ||||
|     local_pdb_complete: Optional[trio.Event] = None | ||||
|     no_remote_has_tty: Optional[trio.Event] = None | ||||
| 
 | ||||
|     # lock in root actor preventing multi-access to local tty | ||||
|     _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() | ||||
| 
 | ||||
|     # XXX: set by the current task waiting on the root tty lock | ||||
|     # and must be cancelled if this actor is cancelled via message | ||||
|     # otherwise deadlocks with the parent actor may ensure | ||||
|     _debugger_request_cs: Optional[trio.CancelScope] = None | ||||
| 
 | ||||
|     _orig_sigint_handler: Optional[Callable] = None | ||||
| 
 | ||||
|     @classmethod | ||||
|     def shield_sigint(cls): | ||||
|         cls._orig_sigint_handler = signal.signal( | ||||
|                 signal.SIGINT, | ||||
|                 shield_sigint, | ||||
|             ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def unshield_sigint(cls): | ||||
|         if cls._orig_sigint_handler is not None: | ||||
|             # restore original sigint handler | ||||
|             signal.signal( | ||||
|                 signal.SIGINT, | ||||
|                 cls._orig_sigint_handler | ||||
|             ) | ||||
| 
 | ||||
|         cls._orig_sigint_handler = None | ||||
| 
 | ||||
|     @classmethod | ||||
|     def maybe_release(cls): | ||||
|         cls.local_task_in_debug = None | ||||
|         if cls.pdb_release_hook: | ||||
|             cls.pdb_release_hook() | ||||
| 
 | ||||
|     @classmethod | ||||
|     def root_release(cls): | ||||
|         try: | ||||
|             cls._debug_lock.release() | ||||
|         except RuntimeError: | ||||
|             # uhhh makes no sense but been seeing the non-owner | ||||
|             # release error even though this is definitely the task | ||||
|             # that locked? | ||||
|             owner = cls._debug_lock.statistics().owner | ||||
|             if owner: | ||||
|                 raise | ||||
| 
 | ||||
|         cls.global_actor_in_debug = None | ||||
|         cls.local_task_in_debug = None | ||||
| 
 | ||||
|         try: | ||||
|             # sometimes the ``trio`` might already be terminated in | ||||
|             # which case this call will raise. | ||||
|             cls.local_pdb_complete.set() | ||||
|         finally: | ||||
|             # restore original sigint handler | ||||
|             cls.unshield_sigint() | ||||
| 
 | ||||
| 
 | ||||
| class TractorConfig(pdbpp.DefaultConfig): | ||||
|     """Custom ``pdbpp`` goodness. | ||||
|     """ | ||||
|     use_pygments = True | ||||
|     # sticky_by_default = True | ||||
|     enable_hidden_frames = False | ||||
| 
 | ||||
|  | @ -96,25 +153,23 @@ class MultiActorPdb(pdbpp.Pdb): | |||
|     # override the pdbpp config with our coolio one | ||||
|     DefaultConfig = TractorConfig | ||||
| 
 | ||||
|     # def preloop(self): | ||||
|     #     print('IN PRELOOP') | ||||
|     #     super().preloop() | ||||
| 
 | ||||
|     # TODO: figure out how to disallow recursive .set_trace() entry | ||||
|     # since that'll cause deadlock for us. | ||||
|     def set_continue(self): | ||||
|         try: | ||||
|             super().set_continue() | ||||
|         finally: | ||||
|             global _local_task_in_debug, _pdb_release_hook | ||||
|             _local_task_in_debug = None | ||||
|             if _pdb_release_hook: | ||||
|                 _pdb_release_hook() | ||||
|             Lock.maybe_release() | ||||
| 
 | ||||
|     def set_quit(self): | ||||
|         try: | ||||
|             super().set_quit() | ||||
|         finally: | ||||
|             global _local_task_in_debug, _pdb_release_hook | ||||
|             _local_task_in_debug = None | ||||
|             if _pdb_release_hook: | ||||
|                 _pdb_release_hook() | ||||
|             Lock.maybe_release() | ||||
| 
 | ||||
| 
 | ||||
| # TODO: will be needed whenever we get to true remote debugging. | ||||
|  | @ -153,7 +208,6 @@ class MultiActorPdb(pdbpp.Pdb): | |||
| #                 log.info("Closing stdin hijack") | ||||
| #                 break | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def _acquire_debug_lock( | ||||
|     uid: Tuple[str, str] | ||||
|  | @ -168,8 +222,6 @@ async def _acquire_debug_lock( | |||
|     to the ``pdb`` repl. | ||||
| 
 | ||||
|     ''' | ||||
|     global _debug_lock, _global_actor_in_debug, _no_remote_has_tty | ||||
| 
 | ||||
|     task_name = trio.lowlevel.current_task().name | ||||
| 
 | ||||
|     log.runtime( | ||||
|  | @ -183,15 +235,15 @@ async def _acquire_debug_lock( | |||
|             f"entering lock checkpoint, remote task: {task_name}:{uid}" | ||||
|         ) | ||||
|         we_acquired = True | ||||
|         await _debug_lock.acquire() | ||||
|         await Lock._debug_lock.acquire() | ||||
| 
 | ||||
|         if _no_remote_has_tty is None: | ||||
|         if Lock.no_remote_has_tty is None: | ||||
|             # mark the tty lock as being in use so that the runtime | ||||
|             # can try to avoid clobbering any connection from a child | ||||
|             # that's currently relying on it. | ||||
|             _no_remote_has_tty = trio.Event() | ||||
|             Lock.no_remote_has_tty = trio.Event() | ||||
| 
 | ||||
|         _global_actor_in_debug = uid | ||||
|         Lock.global_actor_in_debug = uid | ||||
|         log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}") | ||||
| 
 | ||||
|         # NOTE: critical section: this yield is unshielded! | ||||
|  | @ -204,32 +256,32 @@ async def _acquire_debug_lock( | |||
|         # surrounding caller side context should cancel normally | ||||
|         # relaying back to the caller. | ||||
| 
 | ||||
|         yield _debug_lock | ||||
|         yield Lock._debug_lock | ||||
| 
 | ||||
|     finally: | ||||
|         # if _global_actor_in_debug == uid: | ||||
|         # if Lock.global_actor_in_debug == uid: | ||||
| 
 | ||||
|         if ( | ||||
|             we_acquired | ||||
|             and _debug_lock.locked() | ||||
|             and Lock._debug_lock.locked() | ||||
|         ): | ||||
|             _debug_lock.release() | ||||
|             Lock._debug_lock.release() | ||||
| 
 | ||||
|         # IFF there are no more requesting tasks queued up fire, the | ||||
|         # "tty-unlocked" event thereby alerting any monitors of the lock that | ||||
|         # we are now back in the "tty unlocked" state. This is basically | ||||
|         # and edge triggered signal around an empty queue of sub-actor | ||||
|         # tasks that may have tried to acquire the lock. | ||||
|         stats = _debug_lock.statistics() | ||||
|         stats = Lock._debug_lock.statistics() | ||||
|         if ( | ||||
|             not stats.owner | ||||
|         ): | ||||
|             log.runtime(f"No more tasks waiting on tty lock! says {uid}") | ||||
|             if _no_remote_has_tty is not None: | ||||
|                 _no_remote_has_tty.set() | ||||
|                 _no_remote_has_tty = None | ||||
|             if Lock.no_remote_has_tty is not None: | ||||
|                 Lock.no_remote_has_tty.set() | ||||
|                 Lock.no_remote_has_tty = None | ||||
| 
 | ||||
|         _global_actor_in_debug = None | ||||
|         Lock.global_actor_in_debug = None | ||||
| 
 | ||||
|         log.runtime( | ||||
|             f"TTY lock released, remote task: {task_name}:{uid}" | ||||
|  | @ -264,11 +316,8 @@ async def _hijack_stdin_for_child( | |||
|     ) | ||||
| 
 | ||||
|     log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") | ||||
|     Lock.shield_sigint() | ||||
| 
 | ||||
|     orig_handler = signal.signal( | ||||
|         signal.SIGINT, | ||||
|         shield_sigint, | ||||
|     ) | ||||
|     try: | ||||
|         with ( | ||||
|             trio.CancelScope(shield=True), | ||||
|  | @ -320,10 +369,7 @@ async def _hijack_stdin_for_child( | |||
|         return "pdb_unlock_complete" | ||||
| 
 | ||||
|     finally: | ||||
|         signal.signal( | ||||
|             signal.SIGINT, | ||||
|             orig_handler | ||||
|         ) | ||||
|         Lock.unshield_sigint() | ||||
| 
 | ||||
| 
 | ||||
| async def wait_for_parent_stdin_hijack( | ||||
|  | @ -341,10 +387,8 @@ async def wait_for_parent_stdin_hijack( | |||
|     debug (see below inside ``maybe_wait_for_debugger()``). | ||||
| 
 | ||||
|     ''' | ||||
|     global _debugger_request_cs | ||||
| 
 | ||||
|     with trio.CancelScope(shield=True) as cs: | ||||
|         _debugger_request_cs = cs | ||||
|         Lock._debugger_request_cs = cs | ||||
| 
 | ||||
|         try: | ||||
|             async with get_root() as portal: | ||||
|  | @ -364,9 +408,9 @@ async def wait_for_parent_stdin_hijack( | |||
|                         # unblock local caller | ||||
| 
 | ||||
|                         try: | ||||
|                             assert _local_pdb_complete | ||||
|                             assert Lock.local_pdb_complete | ||||
|                             task_status.started(cs) | ||||
|                             await _local_pdb_complete.wait() | ||||
|                             await Lock.local_pdb_complete.wait() | ||||
| 
 | ||||
|                         finally: | ||||
|                             # TODO: shielding currently can cause hangs... | ||||
|  | @ -382,32 +426,25 @@ async def wait_for_parent_stdin_hijack( | |||
|             log.warning('Root actor cancelled debug lock') | ||||
| 
 | ||||
|         finally: | ||||
|             log.debug(f"Exiting debugger for actor {actor_uid}") | ||||
|             global _local_task_in_debug | ||||
|             _local_task_in_debug = None | ||||
|             log.debug(f"Child {actor_uid} released parent stdio lock") | ||||
|             log.pdb(f"Exiting debugger for actor {actor_uid}") | ||||
|             Lock.local_task_in_debug = None | ||||
|             log.pdb(f"Child {actor_uid} released parent stdio lock") | ||||
| 
 | ||||
| 
 | ||||
| def mk_mpdb() -> tuple[MultiActorPdb, Callable]: | ||||
| 
 | ||||
|     pdb = MultiActorPdb() | ||||
|     signal.signal = pdbpp.hideframe(signal.signal) | ||||
|     orig_handler = signal.signal( | ||||
|         signal.SIGINT, | ||||
|         partial(shield_sigint, pdb_obj=pdb), | ||||
|     ) | ||||
|     # signal.signal = pdbpp.hideframe(signal.signal) | ||||
| 
 | ||||
|     Lock.shield_sigint() | ||||
| 
 | ||||
|     # XXX: These are the important flags mentioned in | ||||
|     # https://github.com/python-trio/trio/issues/1155 | ||||
|     # which resolve the traceback spews to console. | ||||
|     pdb.allow_kbdint = True | ||||
|     pdb.nosigint = True | ||||
| 
 | ||||
|     # TODO: add this as method on our pdb obj? | ||||
|     def undo_sigint(): | ||||
|         # restore original sigint handler | ||||
|         signal.signal( | ||||
|             signal.SIGINT, | ||||
|             orig_handler | ||||
|         ) | ||||
| 
 | ||||
|     return pdb, undo_sigint | ||||
|     return pdb, Lock.unshield_sigint | ||||
| 
 | ||||
| 
 | ||||
| async def _breakpoint( | ||||
|  | @ -429,9 +466,6 @@ async def _breakpoint( | |||
|     actor = tractor.current_actor() | ||||
|     task_name = trio.lowlevel.current_task().name | ||||
| 
 | ||||
|     global _local_pdb_complete, _pdb_release_hook | ||||
|     global _local_task_in_debug, _global_actor_in_debug | ||||
| 
 | ||||
|     # TODO: is it possible to debug a trio.Cancelled except block? | ||||
|     # right now it seems like we can kinda do with by shielding | ||||
|     # around ``tractor.breakpoint()`` but not if we move the shielded | ||||
|  | @ -439,14 +473,14 @@ async def _breakpoint( | |||
|     # with trio.CancelScope(shield=shield): | ||||
|     #     await trio.lowlevel.checkpoint() | ||||
| 
 | ||||
|     if not _local_pdb_complete or _local_pdb_complete.is_set(): | ||||
|         _local_pdb_complete = trio.Event() | ||||
|     if not Lock.local_pdb_complete or Lock.local_pdb_complete.is_set(): | ||||
|         Lock.local_pdb_complete = trio.Event() | ||||
| 
 | ||||
|     # TODO: need a more robust check for the "root" actor | ||||
|     if actor._parent_chan and not is_root_process(): | ||||
| 
 | ||||
|         if _local_task_in_debug: | ||||
|             if _local_task_in_debug == task_name: | ||||
|         if Lock.local_task_in_debug: | ||||
|             if Lock.local_task_in_debug == task_name: | ||||
|                 # this task already has the lock and is | ||||
|                 # likely recurrently entering a breakpoint | ||||
|                 return | ||||
|  | @ -456,23 +490,26 @@ async def _breakpoint( | |||
|             # support for recursive entries to `tractor.breakpoint()` | ||||
|             log.warning(f"{actor.uid} already has a debug lock, waiting...") | ||||
| 
 | ||||
|             await _local_pdb_complete.wait() | ||||
|             await Lock.local_pdb_complete.wait() | ||||
|             await trio.sleep(0.1) | ||||
| 
 | ||||
|         # mark local actor as "in debug mode" to avoid recurrent | ||||
|         # entries/requests to the root process | ||||
|         _local_task_in_debug = task_name | ||||
|         Lock.local_task_in_debug = task_name | ||||
| 
 | ||||
|         def child_release_hook(): | ||||
|             # _local_task_in_debug = None | ||||
|             _local_pdb_complete.set() | ||||
| 
 | ||||
|             # restore original sigint handler | ||||
|             undo_sigint() | ||||
|         def child_release(): | ||||
|             try: | ||||
|                 # sometimes the ``trio`` might already be termianated in | ||||
|                 # which case this call will raise. | ||||
|                 Lock.local_pdb_complete.set() | ||||
|             finally: | ||||
|                 # restore original sigint handler | ||||
|                 undo_sigint() | ||||
|                 # should always be cleared in the hijack hook aboved right? | ||||
|                 # _local_task_in_debug = None | ||||
| 
 | ||||
|         # assign unlock callback for debugger teardown hooks | ||||
|         # _pdb_release_hook = _local_pdb_complete.set | ||||
|         _pdb_release_hook = child_release_hook | ||||
|         Lock.pdb_release_hook = child_release | ||||
| 
 | ||||
|         # this **must** be awaited by the caller and is done using the | ||||
|         # root nursery so that the debugger can continue to run without | ||||
|  | @ -489,66 +526,39 @@ async def _breakpoint( | |||
|                     actor.uid, | ||||
|                 ) | ||||
|         except RuntimeError: | ||||
|             child_release_hook() | ||||
|             Lock.pdb_release_hook() | ||||
|             raise | ||||
| 
 | ||||
|     elif is_root_process(): | ||||
| 
 | ||||
|         # we also wait in the root-parent for any child that | ||||
|         # may have the tty locked prior | ||||
|         global _debug_lock | ||||
| 
 | ||||
|         # TODO: wait, what about multiple root tasks acquiring it though? | ||||
|         # root process (us) already has it; ignore | ||||
|         if _global_actor_in_debug == actor.uid: | ||||
|         if Lock.global_actor_in_debug == actor.uid: | ||||
|             return | ||||
| 
 | ||||
|         # XXX: since we need to enter pdb synchronously below, | ||||
|         # we have to release the lock manually from pdb completion | ||||
|         # callbacks. Can't think of a nicer way then this atm. | ||||
|         if _debug_lock.locked(): | ||||
|         if Lock._debug_lock.locked(): | ||||
|             log.warning( | ||||
|                 'Root actor attempting to shield-acquire active tty lock' | ||||
|                 f' owned by {_global_actor_in_debug}') | ||||
|                 f' owned by {Lock.global_actor_in_debug}') | ||||
| 
 | ||||
|             # must shield here to avoid hitting a ``Cancelled`` and | ||||
|             # a child getting stuck bc we clobbered the tty | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await _debug_lock.acquire() | ||||
|                 await Lock._debug_lock.acquire() | ||||
|         else: | ||||
|             # may be cancelled | ||||
|             await _debug_lock.acquire() | ||||
|             await Lock._debug_lock.acquire() | ||||
| 
 | ||||
|         _global_actor_in_debug = actor.uid | ||||
|         _local_task_in_debug = task_name | ||||
|         Lock.global_actor_in_debug = actor.uid | ||||
|         Lock.local_task_in_debug = task_name | ||||
| 
 | ||||
|         # the lock must be released on pdb completion | ||||
|         def teardown(): | ||||
|             global _local_pdb_complete, _debug_lock | ||||
|             global _global_actor_in_debug, _local_task_in_debug | ||||
| 
 | ||||
|             try: | ||||
|                 _debug_lock.release() | ||||
|             except RuntimeError: | ||||
|                 # uhhh makes no sense but been seeing the non-owner | ||||
|                 # release error even though this is definitely the task | ||||
|                 # that locked? | ||||
|                 owner = _debug_lock.statistics().owner | ||||
|                 if owner: | ||||
|                     raise | ||||
| 
 | ||||
|             _global_actor_in_debug = None | ||||
|             _local_task_in_debug = None | ||||
|             _local_pdb_complete.set() | ||||
| 
 | ||||
|             # restore original sigint handler | ||||
|             undo_sigint() | ||||
| 
 | ||||
|         _pdb_release_hook = teardown | ||||
| 
 | ||||
|     # frame = sys._getframe() | ||||
|     # last_f = frame.f_back | ||||
|     # last_f.f_globals['__tracebackhide__'] = True | ||||
|         Lock.pdb_release_hook = Lock.root_release | ||||
| 
 | ||||
|     try: | ||||
|         # block here one (at the appropriate frame *up*) where | ||||
|  | @ -557,15 +567,13 @@ async def _breakpoint( | |||
|         debug_func(actor, pdb) | ||||
| 
 | ||||
|     except bdb.BdbQuit: | ||||
|         if _pdb_release_hook: | ||||
|             _pdb_release_hook() | ||||
|         Lock.maybe_release() | ||||
|         raise | ||||
| 
 | ||||
|     # XXX: apparently we can't do this without showing this frame | ||||
|     # in the backtrace on first entry to the REPL? Seems like an odd | ||||
|     # behaviour that should have been fixed by now. This is also why | ||||
|     # we scrapped all the @cm approaches that were tried previously. | ||||
| 
 | ||||
|     # finally: | ||||
|     #     __tracebackhide__ = True | ||||
|     #     # frame = sys._getframe() | ||||
|  | @ -595,12 +603,25 @@ def shield_sigint( | |||
|     ''' | ||||
|     __tracebackhide__ = True | ||||
| 
 | ||||
|     global _local_task_in_debug, _global_actor_in_debug | ||||
|     uid_in_debug = _global_actor_in_debug | ||||
|     uid_in_debug = Lock.global_actor_in_debug | ||||
| 
 | ||||
|     actor = tractor.current_actor() | ||||
| 
 | ||||
|     def do_cancel(): | ||||
|         # If we haven't tried to cancel the runtime then do that instead | ||||
|         # of raising a KBI (which may non-gracefully destroy | ||||
|         # a ``trio.run()``). | ||||
|         if not actor._cancel_called: | ||||
|             actor.cancel_soon() | ||||
| 
 | ||||
|         # If the runtime is already cancelled it likely means the user | ||||
|         # hit ctrl-c again because teardown didn't full take place in | ||||
|         # which case we do the "hard" raising of a local KBI. | ||||
|         else: | ||||
|             raise KeyboardInterrupt | ||||
| 
 | ||||
|     any_connected = False | ||||
| 
 | ||||
|     if uid_in_debug is not None: | ||||
|         # try to see if the supposed (sub)actor in debug still | ||||
|         # has an active connection to *this* actor, and if not | ||||
|  | @ -616,6 +637,7 @@ def shield_sigint( | |||
|                     f'{uid_in_debug}\n' | ||||
|                     'Allowing SIGINT propagation..' | ||||
|                 ) | ||||
|                 return do_cancel() | ||||
| 
 | ||||
|     # root actor branch that reports whether or not a child | ||||
|     # has locked debugger. | ||||
|  | @ -644,7 +666,17 @@ def shield_sigint( | |||
|     elif ( | ||||
|         not is_root_process() | ||||
|     ): | ||||
|         task = _local_task_in_debug | ||||
|         chan: Channel = actor._parent_chan | ||||
|         if not chan or not chan.connected(): | ||||
|             log.warning( | ||||
|                 'A global actor reported to be in debug ' | ||||
|                 'but no connection exists for its parent:\n' | ||||
|                 f'{uid_in_debug}\n' | ||||
|                 'Allowing SIGINT propagation..' | ||||
|             ) | ||||
|             return do_cancel() | ||||
| 
 | ||||
|         task = Lock.local_task_in_debug | ||||
|         if task: | ||||
|             log.pdb( | ||||
|                 f"Ignoring SIGINT while task in debug mode: `{task}`" | ||||
|  | @ -654,28 +686,20 @@ def shield_sigint( | |||
|         # that **is not** marked in debug mode? | ||||
|         # elif debug_mode(): | ||||
| 
 | ||||
|         else: | ||||
|             log.pdb( | ||||
|                 "Ignoring SIGINT since debug mode is enabled" | ||||
|             ) | ||||
| 
 | ||||
|     # noone has the debugger so raise KBI | ||||
|     else: | ||||
|         # If we haven't tried to cancel the runtime then do that instead | ||||
|         # of raising a KBI (which may non-gracefully destroy | ||||
|         # a ``trio.run()``). | ||||
|         if not actor._cancel_called: | ||||
|             actor.cancel_soon() | ||||
|         log.pdb( | ||||
|             "Ignoring SIGINT since debug mode is enabled" | ||||
|         ) | ||||
| 
 | ||||
|         # If the runtime is already cancelled it likely means the user | ||||
|         # hit ctrl-c again because teardown didn't full take place in | ||||
|         # which case we do the "hard" raising of a local KBI. | ||||
|         else: | ||||
|             raise KeyboardInterrupt | ||||
| 
 | ||||
|     # maybe redraw/print last REPL output to console | ||||
|     if pdb_obj: | ||||
|     # NOTE: currently (at least on ``fancycompleter`` 0.9.2) | ||||
|     # it lookks to be that the last command that was run (eg. ll) | ||||
|     # will be repeated by default. | ||||
| 
 | ||||
|     # TODO: maybe redraw/print last REPL output to console | ||||
|     if ( | ||||
|         pdb_obj | ||||
|         and sys.version_info <= (3, 10) | ||||
|     ): | ||||
|         # TODO: make this work like sticky mode where if there is output | ||||
|         # detected as written to the tty we redraw this part underneath | ||||
|         # and erase the past draw of this same bit above? | ||||
|  | @ -686,12 +710,16 @@ def shield_sigint( | |||
|         # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 | ||||
|         # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py | ||||
| 
 | ||||
|         try: | ||||
|             pdb_obj.do_longlist(None) | ||||
|             print(pdb_obj.prompt, end='', flush=True) | ||||
|         except AttributeError: | ||||
|             log.exception('pdbpp longlist failed...') | ||||
|             raise KeyboardInterrupt | ||||
|         # XXX: lol, see ``pdbpp`` issue: | ||||
|         # https://github.com/pdbpp/pdbpp/issues/496 | ||||
| 
 | ||||
|         # TODO: pretty sure this is what we should expect to have to run | ||||
|         # in total but for now we're just going to wait until `pdbpp` | ||||
|         # figures out it's own stuff on 3.10 (and maybe we'll help). | ||||
|         # pdb_obj.do_longlist(None) | ||||
| 
 | ||||
|         # XXX: we were doing this but it shouldn't be required.. | ||||
|         print(pdb_obj.prompt, end='', flush=True) | ||||
| 
 | ||||
| 
 | ||||
| def _set_trace( | ||||
|  | @ -707,19 +735,21 @@ def _set_trace( | |||
|     # last_f.f_globals['__tracebackhide__'] = True | ||||
| 
 | ||||
|     # start 2 levels up in user code | ||||
|     frame: FrameType = sys._getframe() | ||||
|     frame: Optional[FrameType] = sys._getframe() | ||||
|     if frame: | ||||
|         frame = frame.f_back.f_back  # type: ignore | ||||
|         frame = frame.f_back  # type: ignore | ||||
| 
 | ||||
|     if pdb and actor is not None: | ||||
|     if frame and pdb and actor is not None: | ||||
|         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||
|         # no f!#$&* idea, but when we're in async land | ||||
|         # we need 2x frames up? | ||||
|         frame = frame.f_back | ||||
| 
 | ||||
|     else: | ||||
|         pdb, undo_sigint = mk_mpdb() | ||||
| 
 | ||||
|         # we entered the global ``breakpoint()`` built-in from sync code? | ||||
|         global _local_task_in_debug, _pdb_release_hook | ||||
|         _local_task_in_debug = 'sync' | ||||
|         Lock.local_task_in_debug = 'sync' | ||||
| 
 | ||||
|     pdb.set_trace(frame=frame) | ||||
| 
 | ||||
|  | @ -794,6 +824,7 @@ async def _maybe_enter_pm(err): | |||
|     ): | ||||
|         log.debug("Actor crashed, entering debug mode") | ||||
|         await post_mortem() | ||||
|         Lock.maybe_release() | ||||
|         return True | ||||
| 
 | ||||
|     else: | ||||
|  | @ -838,8 +869,6 @@ async def maybe_wait_for_debugger( | |||
|     if ( | ||||
|         is_root_process() | ||||
|     ): | ||||
|         global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock | ||||
| 
 | ||||
|         # If we error in the root but the debugger is | ||||
|         # engaged we don't want to prematurely kill (and | ||||
|         # thus clobber access to) the local tty since it | ||||
|  | @ -851,11 +880,13 @@ async def maybe_wait_for_debugger( | |||
| 
 | ||||
|         for _ in range(poll_steps): | ||||
| 
 | ||||
|             if _global_actor_in_debug: | ||||
|                 sub_in_debug = tuple(_global_actor_in_debug) | ||||
|             if Lock.global_actor_in_debug: | ||||
|                 sub_in_debug = tuple(Lock.global_actor_in_debug) | ||||
|                 # alive = tractor.current_actor().child_alive(sub_in_debug) | ||||
|                 # if not alive: | ||||
|                 #     break | ||||
| 
 | ||||
|             log.debug( | ||||
|                 'Root polling for debug') | ||||
|             log.debug('Root polling for debug') | ||||
| 
 | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await trio.sleep(poll_delay) | ||||
|  | @ -866,7 +897,7 @@ async def maybe_wait_for_debugger( | |||
|                 # XXX: doesn't seem to work | ||||
|                 # await trio.testing.wait_all_tasks_blocked(cushion=0) | ||||
| 
 | ||||
|                 debug_complete = _no_remote_has_tty | ||||
|                 debug_complete = Lock.no_remote_has_tty | ||||
|                 if ( | ||||
|                     (debug_complete and | ||||
|                      not debug_complete.is_set()) | ||||
|  |  | |||
|  | @ -511,8 +511,8 @@ class Portal: | |||
|             if ctx.chan.connected(): | ||||
|                 log.info( | ||||
|                     'Waiting on final context-task result for\n' | ||||
|                     f'task:{cid}\n' | ||||
|                     f'actor:{uid}' | ||||
|                     f'task: {cid}\n' | ||||
|                     f'actor: {uid}' | ||||
|                 ) | ||||
|                 result = await ctx.result() | ||||
| 
 | ||||
|  | @ -542,6 +542,17 @@ class Portal: | |||
|                     f'value from callee `{result}`' | ||||
|                 ) | ||||
| 
 | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|             # ``Actor._push_result()`` the msg will be discarded and in | ||||
|             # the case where that msg is global debugger unlock (via | ||||
|             # a "stop" msg for a stream), this can result in a deadlock | ||||
|             # where the root is waiting on the lock to clear but the | ||||
|             # child has already cleared it and clobbered IPC. | ||||
|             from ._debug import maybe_wait_for_debugger | ||||
|             await maybe_wait_for_debugger() | ||||
| 
 | ||||
|             # remove the context from runtime tracking | ||||
|             self.actor._contexts.pop((self.channel.uid, ctx.cid)) | ||||
| 
 | ||||
|  |  | |||
|  | @ -103,13 +103,7 @@ async def open_root_actor( | |||
|         _default_arbiter_port, | ||||
|     ) | ||||
| 
 | ||||
|     if loglevel is None: | ||||
|         loglevel = log.get_loglevel() | ||||
|     else: | ||||
|         log._default_loglevel = loglevel | ||||
|         log.get_console_log(loglevel) | ||||
| 
 | ||||
|     assert loglevel | ||||
|     loglevel = (loglevel or log._default_loglevel).upper() | ||||
| 
 | ||||
|     if debug_mode and _spawn._spawn_method == 'trio': | ||||
|         _state._runtime_vars['_debug_mode'] = True | ||||
|  | @ -124,7 +118,7 @@ async def open_root_actor( | |||
|             logging.getLevelName( | ||||
|                 # lul, need the upper case for the -> int map? | ||||
|                 # sweet "dynamic function behaviour" stdlib... | ||||
|                 loglevel.upper() | ||||
|                 loglevel, | ||||
|             ) > logging.getLevelName('PDB') | ||||
|         ): | ||||
|             loglevel = 'PDB' | ||||
|  | @ -134,19 +128,24 @@ async def open_root_actor( | |||
|             "Debug mode is only supported for the `trio` backend!" | ||||
|         ) | ||||
| 
 | ||||
|     # make a temporary connection to see if an arbiter exists | ||||
|     arbiter_found = False | ||||
|     log.get_console_log(loglevel) | ||||
| 
 | ||||
|     try: | ||||
|         # make a temporary connection to see if an arbiter exists, | ||||
|         # if one can't be made quickly we assume none exists. | ||||
|         arbiter_found = False | ||||
| 
 | ||||
|         # TODO: this connect-and-bail forces us to have to carefully | ||||
|         # rewrap TCP 104-connection-reset errors as EOF so as to avoid | ||||
|         # propagating cancel-causing errors to the channel-msg loop | ||||
|         # machinery.  Likely it would be better to eventually have | ||||
|         # a "discovery" protocol with basic handshake instead. | ||||
|         async with _connect_chan(host, port): | ||||
|             arbiter_found = True | ||||
|         with trio.move_on_after(1): | ||||
|             async with _connect_chan(host, port): | ||||
|                 arbiter_found = True | ||||
| 
 | ||||
|     except OSError: | ||||
|         # TODO: make this a "discovery" log level? | ||||
|         logger.warning(f"No actor could be found @ {host}:{port}") | ||||
| 
 | ||||
|     # create a local actor and start up its main routine/task | ||||
|  | @ -216,7 +215,8 @@ async def open_root_actor( | |||
|             finally: | ||||
|                 # NOTE: not sure if we'll ever need this but it's | ||||
|                 # possibly better for even more determinism? | ||||
|                 # logger.cancel(f'Waiting on {len(nurseries)} nurseries in root..') | ||||
|                 # logger.cancel( | ||||
|                 #     f'Waiting on {len(nurseries)} nurseries in root..') | ||||
|                 # nurseries = actor._actoruid2nursery.values() | ||||
|                 # async with trio.open_nursery() as tempn: | ||||
|                 #     for an in nurseries: | ||||
|  |  | |||
|  | @ -307,7 +307,8 @@ async def new_proc( | |||
|         proc: Optional[trio.Process] = None | ||||
|         try: | ||||
|             try: | ||||
|                 proc = await trio.open_process(spawn_cmd) | ||||
|                 # TODO: needs ``trio_typing`` patch? | ||||
|                 proc = await trio.lowlevel.open_process(spawn_cmd)  # type: ignore | ||||
| 
 | ||||
|                 log.runtime(f"Started {proc}") | ||||
| 
 | ||||
|  | @ -334,6 +335,9 @@ async def new_proc( | |||
|                                     await proc.wait() | ||||
|                 raise | ||||
| 
 | ||||
|             # a sub-proc ref **must** exist now | ||||
|             assert proc | ||||
| 
 | ||||
|             portal = Portal(chan) | ||||
|             actor_nursery._children[subactor.uid] = ( | ||||
|                 subactor, proc, portal) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue