forked from goodboy/tractor
				
			Compare commits
	
		
			42 Commits 
		
	
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 6a280da597 | |
|  | f984fa8daa | |
|  | cc18c84389 | |
|  | af205c08f2 | |
|  | ab557fae21 | |
|  | 0b1c1ac568 | |
|  | 9e37bb22e1 | |
|  | 01dea6fe32 | |
|  | 79faffd577 | |
|  | b3fd5da1be | |
|  | 414c59cca6 | |
|  | 7710213604 | |
|  | 71e779dca3 | |
|  | 5457aa566c | |
|  | 20b902b300 | |
|  | 3955906654 | |
|  | af1ee3f0a6 | |
|  | f9f4bcf27c | |
|  | 07ac6eb5d0 | |
|  | 83ed2f6286 | |
|  | a40b168dfb | |
|  | 1822d0b48b | |
|  | dbc689d55a | |
|  | e49cccf666 | |
|  | 5892d64d6e | |
|  | e83d158bfb | |
|  | e107257ac0 | |
|  | 83367caf42 | |
|  | e6a520c944 | |
|  | 5a83f373ef | |
|  | 228dfff91c | |
|  | 866f6f9d40 | |
|  | f9a8543811 | |
|  | aa09a31d25 | |
|  | 9a1dadecff | |
|  | 861884e075 | |
|  | e204f858ac | |
|  | 36e92b9faf | |
|  | 742e004810 | |
|  | ef7921ce11 | |
|  | 542fe0372b | |
|  | 3e9998ea83 | 
|  | @ -120,4 +120,4 @@ jobs: | ||||||
|         run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager |         run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager | ||||||
| 
 | 
 | ||||||
|       - name: Run tests |       - name: Run tests | ||||||
|         run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs |         run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace | ||||||
|  |  | ||||||
|  | @ -3,13 +3,14 @@ | ||||||
| |gh_actions| | |gh_actions| | ||||||
| |docs| | |docs| | ||||||
| 
 | 
 | ||||||
| ``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_. | ``tractor`` is a `structured concurrent`_, multi-processing_ runtime | ||||||
|  | built on trio_. | ||||||
| 
 | 
 | ||||||
| Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": | Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": | ||||||
| our nurseries_ let you spawn new Python processes which each run a ``trio`` | our nurseries_ let you spawn new Python processes which each run a ``trio`` | ||||||
| scheduled runtime - a call to ``trio.run()``. | scheduled runtime - a call to ``trio.run()``. | ||||||
| 
 | 
 | ||||||
| We believe the system adhere's to the `3 axioms`_ of an "`actor model`_" | We believe the system adheres to the `3 axioms`_ of an "`actor model`_" | ||||||
| but likely *does not* look like what *you* probably think an "actor | but likely *does not* look like what *you* probably think an "actor | ||||||
| model" looks like, and that's *intentional*. | model" looks like, and that's *intentional*. | ||||||
| 
 | 
 | ||||||
|  | @ -579,13 +580,13 @@ say hi, please feel free to reach us in our `matrix channel`_.  If | ||||||
| matrix seems too hip, we're also mostly all in the the `trio gitter | matrix seems too hip, we're also mostly all in the the `trio gitter | ||||||
| channel`_! | channel`_! | ||||||
| 
 | 
 | ||||||
|  | .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 | ||||||
|  | .. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing | ||||||
|  | .. _trio: https://github.com/python-trio/trio | ||||||
| .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements | .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements | ||||||
| .. _actor model: https://en.wikipedia.org/wiki/Actor_model | .. _actor model: https://en.wikipedia.org/wiki/Actor_model | ||||||
| .. _trio: https://github.com/python-trio/trio |  | ||||||
| .. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing |  | ||||||
| .. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles | .. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles | ||||||
| .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich | .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich | ||||||
| .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 |  | ||||||
| .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s | .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s | ||||||
| .. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts | .. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts | ||||||
| .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s | .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s | ||||||
|  |  | ||||||
|  | @ -0,0 +1,50 @@ | ||||||
|  | import tractor | ||||||
|  | import trio | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def gen(): | ||||||
|  |     yield 'yo' | ||||||
|  |     await tractor.breakpoint() | ||||||
|  |     yield 'yo' | ||||||
|  |     await tractor.breakpoint() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def just_bp( | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     await ctx.started() | ||||||
|  |     await tractor.breakpoint() | ||||||
|  | 
 | ||||||
|  |     # TODO: bps and errors in this call.. | ||||||
|  |     async for val in gen(): | ||||||
|  |         print(val) | ||||||
|  | 
 | ||||||
|  |     # await trio.sleep(0.5) | ||||||
|  | 
 | ||||||
|  |     # prematurely destroy the connection | ||||||
|  |     await ctx.chan.aclose() | ||||||
|  | 
 | ||||||
|  |     # THIS CAUSES AN UNRECOVERABLE HANG | ||||||
|  |     # without latest ``pdbpp``: | ||||||
|  |     assert 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def main(): | ||||||
|  |     async with tractor.open_nursery( | ||||||
|  |         debug_mode=True, | ||||||
|  |     ) as n: | ||||||
|  |         p = await n.start_actor( | ||||||
|  |             'bp_boi', | ||||||
|  |             enable_modules=[__name__], | ||||||
|  |         ) | ||||||
|  |         async with p.open_context( | ||||||
|  |             just_bp, | ||||||
|  |         ) as (ctx, first): | ||||||
|  |             await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     trio.run(main) | ||||||
|  | @ -0,0 +1,49 @@ | ||||||
|  | import trio | ||||||
|  | import click | ||||||
|  | import tractor | ||||||
|  | import pydantic | ||||||
|  | # from multiprocessing import shared_memory | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def just_sleep( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.Context, | ||||||
|  |     **kwargs, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  |     ''' | ||||||
|  |     Test a small ping-pong 2-way streaming server. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     await ctx.started() | ||||||
|  |     await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def main() -> None: | ||||||
|  | 
 | ||||||
|  |     proc = await trio.open_process( ( | ||||||
|  |         'python', | ||||||
|  |         '-c', | ||||||
|  |         'import trio; trio.run(trio.sleep_forever)', | ||||||
|  |     )) | ||||||
|  |     await proc.wait() | ||||||
|  |     # await trio.sleep_forever() | ||||||
|  |     # async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |     #     portal = await n.start_actor( | ||||||
|  |     #         'rpc_server', | ||||||
|  |     #         enable_modules=[__name__], | ||||||
|  |     #     ) | ||||||
|  | 
 | ||||||
|  |     #     async with portal.open_context( | ||||||
|  |     #         just_sleep,  # taken from pytest parameterization | ||||||
|  |     #     ) as (ctx, sent): | ||||||
|  |     #         await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     import time | ||||||
|  |     # time.sleep(999) | ||||||
|  |     trio.run(main) | ||||||
							
								
								
									
										5
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										5
									
								
								setup.py
								
								
								
								
							|  | @ -54,7 +54,10 @@ setup( | ||||||
|         # tooling |         # tooling | ||||||
|         'colorlog', |         'colorlog', | ||||||
|         'wrapt', |         'wrapt', | ||||||
|         'pdbpp', | 
 | ||||||
|  |         # 3.10 has an outstanding unreleased issue and `pdbpp` itself | ||||||
|  |         # pins to patched forks of its own dependencies as well. | ||||||
|  |         "pdbpp @ git+https://github.com/pdbpp/pdbpp@master#egg=pdbpp",  # noqa: E501 | ||||||
| 
 | 
 | ||||||
|         # serialization |         # serialization | ||||||
|         'msgpack>=1.0.3', |         'msgpack>=1.0.3', | ||||||
|  |  | ||||||
|  | @ -1,5 +1,5 @@ | ||||||
| """ | """ | ||||||
| That native debug better work! | That "native" debug mode better work! | ||||||
| 
 | 
 | ||||||
| All these tests can be understood (somewhat) by running the equivalent | All these tests can be understood (somewhat) by running the equivalent | ||||||
| `examples/debugging/` scripts manually. | `examples/debugging/` scripts manually. | ||||||
|  | @ -13,6 +13,7 @@ TODO: | ||||||
| import time | import time | ||||||
| from os import path | from os import path | ||||||
| import platform | import platform | ||||||
|  | from typing import Optional | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| import pexpect | import pexpect | ||||||
|  | @ -73,6 +74,14 @@ def spawn( | ||||||
|     return _spawn |     return _spawn | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @pytest.fixture( | ||||||
|  |     params=[False, True], | ||||||
|  |     ids='ctl-c={}'.format, | ||||||
|  | ) | ||||||
|  | def ctlc(request) -> bool: | ||||||
|  |     yield request.param | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||||
|     'user_in_out', |     'user_in_out', | ||||||
|     [ |     [ | ||||||
|  | @ -137,20 +146,50 @@ def test_root_actor_bp(spawn, user_in_out): | ||||||
|         assert expect_err_str in str(child.before) |         assert expect_err_str in str(child.before) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_root_actor_bp_forever(spawn): | def do_ctlc( | ||||||
|  |     child, | ||||||
|  |     count: int = 3, | ||||||
|  |     patt: Optional[str] = None, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     # make sure ctl-c sends don't do anything but repeat output | ||||||
|  |     for _ in range(count): | ||||||
|  |         child.sendcontrol('c') | ||||||
|  |         child.expect(r"\(Pdb\+\+\)") | ||||||
|  | 
 | ||||||
|  |         if patt: | ||||||
|  |             # should see the last line on console | ||||||
|  |             before = str(child.before.decode()) | ||||||
|  |             assert patt in before | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_root_actor_bp_forever( | ||||||
|  |     spawn, | ||||||
|  |     ctlc: bool, | ||||||
|  | ): | ||||||
|     "Re-enter a breakpoint from the root actor-task." |     "Re-enter a breakpoint from the root actor-task." | ||||||
|     child = spawn('root_actor_breakpoint_forever') |     child = spawn('root_actor_breakpoint_forever') | ||||||
| 
 | 
 | ||||||
|     # do some "next" commands to demonstrate recurrent breakpoint |     # do some "next" commands to demonstrate recurrent breakpoint | ||||||
|     # entries |     # entries | ||||||
|     for _ in range(10): |     for _ in range(10): | ||||||
|         child.sendline('next') | 
 | ||||||
|         child.expect(r"\(Pdb\+\+\)") |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # do one continue which should trigger a new task to lock the tty |         if ctlc: | ||||||
|  |             do_ctlc(child) | ||||||
|  | 
 | ||||||
|  |         child.sendline('next') | ||||||
|  | 
 | ||||||
|  |     # do one continue which should trigger a | ||||||
|  |     # new task to lock the tty | ||||||
|     child.sendline('continue') |     child.sendline('continue') | ||||||
|     child.expect(r"\(Pdb\+\+\)") |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|  |     if ctlc: | ||||||
|  |         do_ctlc(child) | ||||||
|  | 
 | ||||||
|     # XXX: this previously caused a bug! |     # XXX: this previously caused a bug! | ||||||
|     child.sendline('n') |     child.sendline('n') | ||||||
|     child.expect(r"\(Pdb\+\+\)") |     child.expect(r"\(Pdb\+\+\)") | ||||||
|  | @ -158,8 +197,15 @@ def test_root_actor_bp_forever(spawn): | ||||||
|     child.sendline('n') |     child.sendline('n') | ||||||
|     child.expect(r"\(Pdb\+\+\)") |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|  |     # quit out of the loop | ||||||
|  |     child.sendline('q') | ||||||
|  |     child.expect(pexpect.EOF) | ||||||
| 
 | 
 | ||||||
| def test_subactor_error(spawn): | 
 | ||||||
|  | def test_subactor_error( | ||||||
|  |     spawn, | ||||||
|  |     ctlc: bool, | ||||||
|  | ): | ||||||
|     "Single subactor raising an error" |     "Single subactor raising an error" | ||||||
| 
 | 
 | ||||||
|     child = spawn('subactor_error') |     child = spawn('subactor_error') | ||||||
|  | @ -170,23 +216,29 @@ def test_subactor_error(spawn): | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before |     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||||
| 
 | 
 | ||||||
|     # send user command |     # make sure ctl-c sends don't do anything but repeat output | ||||||
|     # (in this case it's the same for 'continue' vs. 'quit') |     if ctlc: | ||||||
|     child.sendline('continue') |         do_ctlc( | ||||||
|  |             child, | ||||||
|  |             patt='(doggypants)', | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|     # the debugger should enter a second time in the nursery |     # send user command and (in this case it's the same for 'continue' | ||||||
|  |     # vs. 'quit') the debugger should enter a second time in the nursery | ||||||
|     # creating actor |     # creating actor | ||||||
| 
 |     child.sendline('continue') | ||||||
|     child.expect(r"\(Pdb\+\+\)") |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
| 
 |  | ||||||
|     # root actor gets debugger engaged |     # root actor gets debugger engaged | ||||||
|     assert "Attaching to pdb in crashed actor: ('root'" in before |     assert "Attaching to pdb in crashed actor: ('root'" in before | ||||||
| 
 |  | ||||||
|     # error is a remote error propagated from the subactor |     # error is a remote error propagated from the subactor | ||||||
|     assert "RemoteActorError: ('name_error'" in before |     assert "RemoteActorError: ('name_error'" in before | ||||||
| 
 | 
 | ||||||
|  |     # another round | ||||||
|  |     if ctlc: | ||||||
|  |         do_ctlc(child) | ||||||
|  | 
 | ||||||
|     child.sendline('c') |     child.sendline('c') | ||||||
|     child.expect('\r\n') |     child.expect('\r\n') | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): | ||||||
|     'example_script', |     'example_script', | ||||||
| 
 | 
 | ||||||
|     # walk yields: (dirpath, dirnames, filenames) |     # walk yields: (dirpath, dirnames, filenames) | ||||||
|     [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] |     [ | ||||||
|  |         (p[0], f) for p in os.walk(examples_dir()) for f in p[2] | ||||||
| 
 | 
 | ||||||
|         if '__' not in f |         if '__' not in f | ||||||
|         and f[0] != '_' |         and f[0] != '_' | ||||||
|         and 'debugging' not in p[0]], |         and 'debugging' not in p[0] | ||||||
|  |         and 'integration' not in p[0] | ||||||
|  |     ], | ||||||
| 
 | 
 | ||||||
|     ids=lambda t: t[1], |     ids=lambda t: t[1], | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | @ -26,8 +26,11 @@ import importlib | ||||||
| import importlib.util | import importlib.util | ||||||
| import inspect | import inspect | ||||||
| import uuid | import uuid | ||||||
| import typing | from typing import ( | ||||||
| from typing import Any, Optional, Union |     Any, Optional, | ||||||
|  |     Union, TYPE_CHECKING, | ||||||
|  |     Callable, | ||||||
|  | ) | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
| import sys | import sys | ||||||
| import os | import os | ||||||
|  | @ -57,6 +60,10 @@ from . import _state | ||||||
| from . import _mp_fixup_main | from . import _mp_fixup_main | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | if TYPE_CHECKING: | ||||||
|  |     from ._supervise import ActorNursery | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| log = get_logger('tractor') | log = get_logger('tractor') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -65,7 +72,7 @@ async def _invoke( | ||||||
|     actor: 'Actor', |     actor: 'Actor', | ||||||
|     cid: str, |     cid: str, | ||||||
|     chan: Channel, |     chan: Channel, | ||||||
|     func: typing.Callable, |     func: Callable, | ||||||
|     kwargs: dict[str, Any], |     kwargs: dict[str, Any], | ||||||
| 
 | 
 | ||||||
|     is_rpc: bool = True, |     is_rpc: bool = True, | ||||||
|  | @ -200,7 +207,7 @@ async def _invoke( | ||||||
|                 ctx = actor._contexts.pop((chan.uid, cid)) |                 ctx = actor._contexts.pop((chan.uid, cid)) | ||||||
|                 if ctx: |                 if ctx: | ||||||
|                     log.runtime( |                     log.runtime( | ||||||
|                         f'Context entrypoint for {func} was terminated:\n{ctx}' |                         f'Context entrypoint {func} was terminated:\n{ctx}' | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|             assert cs |             assert cs | ||||||
|  | @ -233,7 +240,10 @@ async def _invoke( | ||||||
|                 task_status.started(cs) |                 task_status.started(cs) | ||||||
|                 await chan.send({'return': await coro, 'cid': cid}) |                 await chan.send({'return': await coro, 'cid': cid}) | ||||||
| 
 | 
 | ||||||
|     except (Exception, trio.MultiError) as err: |     except ( | ||||||
|  |         Exception, | ||||||
|  |         trio.MultiError | ||||||
|  |     ) as err: | ||||||
| 
 | 
 | ||||||
|         if not is_multi_cancelled(err): |         if not is_multi_cancelled(err): | ||||||
| 
 | 
 | ||||||
|  | @ -267,7 +277,13 @@ async def _invoke( | ||||||
|         try: |         try: | ||||||
|             await chan.send(err_msg) |             await chan.send(err_msg) | ||||||
| 
 | 
 | ||||||
|         except trio.ClosedResourceError: |         # TODO: tests for this scenario: | ||||||
|  |         # - RPC caller closes connection before getting a response | ||||||
|  |         # should **not** crash this actor.. | ||||||
|  |         except ( | ||||||
|  |             trio.ClosedResourceError, | ||||||
|  |             trio.BrokenResourceError, | ||||||
|  |         ): | ||||||
|             # if we can't propagate the error that's a big boo boo |             # if we can't propagate the error that's a big boo boo | ||||||
|             log.error( |             log.error( | ||||||
|                 f"Failed to ship error to caller @ {chan.uid} !?" |                 f"Failed to ship error to caller @ {chan.uid} !?" | ||||||
|  | @ -316,7 +332,9 @@ async def try_ship_error_to_parent( | ||||||
|             trio.ClosedResourceError, |             trio.ClosedResourceError, | ||||||
|             trio.BrokenResourceError, |             trio.BrokenResourceError, | ||||||
|         ): |         ): | ||||||
|             log.error( |             # in SC terms this is one of the worst things that can | ||||||
|  |             # happen and creates the 2-general's dilemma. | ||||||
|  |             log.critical( | ||||||
|                 f"Failed to ship error to parent " |                 f"Failed to ship error to parent " | ||||||
|                 f"{channel.uid}, channel was closed" |                 f"{channel.uid}, channel was closed" | ||||||
|             ) |             ) | ||||||
|  | @ -424,7 +442,7 @@ class Actor: | ||||||
|         # (chan, cid) -> (cancel_scope, func) |         # (chan, cid) -> (cancel_scope, func) | ||||||
|         self._rpc_tasks: dict[ |         self._rpc_tasks: dict[ | ||||||
|             tuple[Channel, str], |             tuple[Channel, str], | ||||||
|             tuple[trio.CancelScope, typing.Callable, trio.Event] |             tuple[trio.CancelScope, Callable, trio.Event] | ||||||
|         ] = {} |         ] = {} | ||||||
| 
 | 
 | ||||||
|         # map {actor uids -> Context} |         # map {actor uids -> Context} | ||||||
|  | @ -513,6 +531,7 @@ class Actor: | ||||||
|         self._no_more_peers = trio.Event()  # unset |         self._no_more_peers = trio.Event()  # unset | ||||||
| 
 | 
 | ||||||
|         chan = Channel.from_stream(stream) |         chan = Channel.from_stream(stream) | ||||||
|  |         uid: Optional[tuple[str, str]] = chan.uid | ||||||
|         log.runtime(f"New connection to us {chan}") |         log.runtime(f"New connection to us {chan}") | ||||||
| 
 | 
 | ||||||
|         # send/receive initial handshake response |         # send/receive initial handshake response | ||||||
|  | @ -560,39 +579,63 @@ class Actor: | ||||||
|         # append new channel |         # append new channel | ||||||
|         self._peers[uid].append(chan) |         self._peers[uid].append(chan) | ||||||
| 
 | 
 | ||||||
|  |         local_nursery: Optional[ActorNursery] = None  # noqa | ||||||
|  |         disconnected: bool = False | ||||||
|  | 
 | ||||||
|         # Begin channel management - respond to remote requests and |         # Begin channel management - respond to remote requests and | ||||||
|         # process received reponses. |         # process received reponses. | ||||||
|         try: |         try: | ||||||
|             await self._process_messages(chan) |             disconnected = await self._process_messages(chan) | ||||||
| 
 | 
 | ||||||
|         except trio.Cancelled: |         except ( | ||||||
|  |             trio.Cancelled, | ||||||
|  |         ): | ||||||
|             log.cancel(f"Msg loop was cancelled for {chan}") |             log.cancel(f"Msg loop was cancelled for {chan}") | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|         finally: |         finally: | ||||||
|  |             local_nursery = self._actoruid2nursery.get(uid, local_nursery) | ||||||
|  | 
 | ||||||
|             # This is set in ``Portal.cancel_actor()``. So if |             # This is set in ``Portal.cancel_actor()``. So if | ||||||
|             # the peer was cancelled we try to wait for them |             # the peer was cancelled we try to wait for them | ||||||
|             # to tear down their side of the connection before |             # to tear down their side of the connection before | ||||||
|             # moving on with closing our own side. |             # moving on with closing our own side. | ||||||
|             local_nursery = self._actoruid2nursery.get(chan.uid) |  | ||||||
|             if ( |             if ( | ||||||
|                 local_nursery |                 local_nursery | ||||||
|             ): |             ): | ||||||
|  |                 if disconnected: | ||||||
|  |                     # if the transport died and this actor is still | ||||||
|  |                     # registered within a local nursery, we report that the | ||||||
|  |                     # IPC layer may have failed unexpectedly since it may be | ||||||
|  |                     # the cause of other downstream errors. | ||||||
|  |                     entry = local_nursery._children.get(uid) | ||||||
|  |                     if entry: | ||||||
|  |                         _, proc, _ = entry | ||||||
|  |                         log.warning( | ||||||
|  |                             f'Actor {uid}@{proc} IPC connection broke!?') | ||||||
|  |                         # if proc.poll() is not None: | ||||||
|  |                         #     log.error('Actor {uid} proc died and IPC broke?') | ||||||
|  | 
 | ||||||
|                 log.cancel(f"Waiting on cancel request to peer {chan.uid}") |                 log.cancel(f"Waiting on cancel request to peer {chan.uid}") | ||||||
|                 # XXX: this is a soft wait on the channel (and its |                 # XXX: this is a soft wait on the channel (and its | ||||||
|                 # underlying transport protocol) to close from the remote |                 # underlying transport protocol) to close from the | ||||||
|                 # peer side since we presume that any channel which |                 # remote peer side since we presume that any channel | ||||||
|                 # is mapped to a sub-actor (i.e. it's managed by |                 # which is mapped to a sub-actor (i.e. it's managed by | ||||||
|                 # one of our local nurseries) |                 # one of our local nurseries) has a message is sent to | ||||||
|                 # message is sent to the peer likely by this actor which is |                 # the peer likely by this actor (which is now in | ||||||
|                 # now in a cancelled condition) when the local runtime here |                 # a cancelled condition) when the local runtime here is | ||||||
|                 # is now cancelled while (presumably) in the middle of msg |                 # now cancelled while (presumably) in the middle of msg | ||||||
|                 # loop processing. |                 # loop processing. | ||||||
|                 with trio.move_on_after(0.5) as cs: |                 with trio.move_on_after(0.5) as cs: | ||||||
|                     cs.shield = True |                     cs.shield = True | ||||||
|                     # Attempt to wait for the far end to close the channel |                     # Attempt to wait for the far end to close the channel | ||||||
|                     # and bail after timeout (2-generals on closure). |                     # and bail after timeout (2-generals on closure). | ||||||
|                     assert chan.msgstream |                     assert chan.msgstream | ||||||
|  | 
 | ||||||
|  |                     log.runtime( | ||||||
|  |                         f'Draining lingering msgs from stream {chan.msgstream}' | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|                     async for msg in chan.msgstream.drain(): |                     async for msg in chan.msgstream.drain(): | ||||||
|                         # try to deliver any lingering msgs |                         # try to deliver any lingering msgs | ||||||
|                         # before we destroy the channel. |                         # before we destroy the channel. | ||||||
|  | @ -609,6 +652,8 @@ class Actor: | ||||||
| 
 | 
 | ||||||
|                     await local_nursery.exited.wait() |                     await local_nursery.exited.wait() | ||||||
| 
 | 
 | ||||||
|  |                 # if local_nursery._children | ||||||
|  | 
 | ||||||
|             # ``Channel`` teardown and closure sequence |             # ``Channel`` teardown and closure sequence | ||||||
| 
 | 
 | ||||||
|             # Drop ref to channel so it can be gc-ed and disconnected |             # Drop ref to channel so it can be gc-ed and disconnected | ||||||
|  | @ -618,7 +663,7 @@ class Actor: | ||||||
| 
 | 
 | ||||||
|             if not chans: |             if not chans: | ||||||
|                 log.runtime(f"No more channels for {chan.uid}") |                 log.runtime(f"No more channels for {chan.uid}") | ||||||
|                 self._peers.pop(chan.uid, None) |                 self._peers.pop(uid, None) | ||||||
| 
 | 
 | ||||||
|                 # for (uid, cid) in self._contexts.copy(): |                 # for (uid, cid) in self._contexts.copy(): | ||||||
|                 #     if chan.uid == uid: |                 #     if chan.uid == uid: | ||||||
|  | @ -626,11 +671,13 @@ class Actor: | ||||||
| 
 | 
 | ||||||
|             log.runtime(f"Peers is {self._peers}") |             log.runtime(f"Peers is {self._peers}") | ||||||
| 
 | 
 | ||||||
|             if not self._peers:  # no more channels connected |             # No more channels to other actors (at all) registered | ||||||
|  |             # as connected. | ||||||
|  |             if not self._peers: | ||||||
|                 log.runtime("Signalling no more peer channels") |                 log.runtime("Signalling no more peer channels") | ||||||
|                 self._no_more_peers.set() |                 self._no_more_peers.set() | ||||||
| 
 | 
 | ||||||
|             # # XXX: is this necessary (GC should do it?) |             # XXX: is this necessary (GC should do it)? | ||||||
|             if chan.connected(): |             if chan.connected(): | ||||||
|                 # if the channel is still connected it may mean the far |                 # if the channel is still connected it may mean the far | ||||||
|                 # end has not closed and we may have gotten here due to |                 # end has not closed and we may have gotten here due to | ||||||
|  | @ -665,8 +712,8 @@ class Actor: | ||||||
|             ctx = self._contexts[(uid, cid)] |             ctx = self._contexts[(uid, cid)] | ||||||
|         except KeyError: |         except KeyError: | ||||||
|             log.warning( |             log.warning( | ||||||
|                     f'Ignoring msg from [no-longer/un]known context with {uid}:' |                 f'Ignoring msg from [no-longer/un]known context {uid}:' | ||||||
|                     f'\n{msg}') |                 f'\n{msg}') | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|         send_chan = ctx._send_chan |         send_chan = ctx._send_chan | ||||||
|  | @ -813,7 +860,7 @@ class Actor: | ||||||
|         shield: bool = False, |         shield: bool = False, | ||||||
|         task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, |         task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> bool: | ||||||
|         ''' |         ''' | ||||||
|         Process messages for the channel async-RPC style. |         Process messages for the channel async-RPC style. | ||||||
| 
 | 
 | ||||||
|  | @ -839,7 +886,7 @@ class Actor: | ||||||
|                     if msg is None:  # loop terminate sentinel |                     if msg is None:  # loop terminate sentinel | ||||||
| 
 | 
 | ||||||
|                         log.cancel( |                         log.cancel( | ||||||
|                             f"Channerl to {chan.uid} terminated?\n" |                             f"Channel to {chan.uid} terminated?\n" | ||||||
|                             "Cancelling all associated tasks..") |                             "Cancelling all associated tasks..") | ||||||
| 
 | 
 | ||||||
|                         for (channel, cid) in self._rpc_tasks.copy(): |                         for (channel, cid) in self._rpc_tasks.copy(): | ||||||
|  | @ -986,6 +1033,9 @@ class Actor: | ||||||
|             # up. |             # up. | ||||||
|             log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') |             log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') | ||||||
| 
 | 
 | ||||||
|  |             # transport **was** disconnected | ||||||
|  |             return True | ||||||
|  | 
 | ||||||
|         except (Exception, trio.MultiError) as err: |         except (Exception, trio.MultiError) as err: | ||||||
|             if nursery_cancelled_before_task: |             if nursery_cancelled_before_task: | ||||||
|                 sn = self._service_n |                 sn = self._service_n | ||||||
|  | @ -1010,6 +1060,9 @@ class Actor: | ||||||
|                 f"Exiting msg loop for {chan} from {chan.uid} " |                 f"Exiting msg loop for {chan} from {chan.uid} " | ||||||
|                 f"with last msg:\n{msg}") |                 f"with last msg:\n{msg}") | ||||||
| 
 | 
 | ||||||
|  |         # transport **was not** disconnected | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|     async def _from_parent( |     async def _from_parent( | ||||||
|         self, |         self, | ||||||
|         parent_addr: Optional[tuple[str, int]], |         parent_addr: Optional[tuple[str, int]], | ||||||
|  |  | ||||||
|  | @ -18,8 +18,10 @@ | ||||||
| Multi-core debugging for da peeps! | Multi-core debugging for da peeps! | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
|  | from __future__ import annotations | ||||||
| import bdb | import bdb | ||||||
| import sys | import sys | ||||||
|  | import signal | ||||||
| from functools import partial | from functools import partial | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from typing import ( | from typing import ( | ||||||
|  | @ -29,16 +31,18 @@ from typing import ( | ||||||
|     AsyncIterator, |     AsyncIterator, | ||||||
|     AsyncGenerator, |     AsyncGenerator, | ||||||
| ) | ) | ||||||
|  | from types import FrameType | ||||||
| 
 | 
 | ||||||
| import tractor | import tractor | ||||||
| import trio | import trio | ||||||
| from trio_typing import TaskStatus | from trio_typing import TaskStatus | ||||||
| 
 | 
 | ||||||
| from .log import get_logger | from .log import get_logger | ||||||
| from . import _state |  | ||||||
| from ._discovery import get_root | from ._discovery import get_root | ||||||
| from ._state import is_root_process, debug_mode | from ._state import is_root_process, debug_mode | ||||||
| from ._exceptions import is_multi_cancelled | from ._exceptions import is_multi_cancelled | ||||||
|  | from ._ipc import Channel | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| try: | try: | ||||||
|     # wtf: only exported when installed in dev mode? |     # wtf: only exported when installed in dev mode? | ||||||
|  | @ -46,7 +50,8 @@ try: | ||||||
| except ImportError: | except ImportError: | ||||||
|     # pdbpp is installed in regular mode...it monkey patches stuff |     # pdbpp is installed in regular mode...it monkey patches stuff | ||||||
|     import pdb |     import pdb | ||||||
|     assert pdb.xpm, "pdbpp is not installed?"  # type: ignore |     xpm = getattr(pdb, 'xpm', None) | ||||||
|  |     assert xpm, "pdbpp is not installed?"  # type: ignore | ||||||
|     pdbpp = pdb |     pdbpp = pdb | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
|  | @ -81,11 +86,14 @@ class TractorConfig(pdbpp.DefaultConfig): | ||||||
|     """Custom ``pdbpp`` goodness. |     """Custom ``pdbpp`` goodness. | ||||||
|     """ |     """ | ||||||
|     # sticky_by_default = True |     # sticky_by_default = True | ||||||
|  |     enable_hidden_frames = False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class PdbwTeardown(pdbpp.Pdb): | class MultiActorPdb(pdbpp.Pdb): | ||||||
|     """Add teardown hooks to the regular ``pdbpp.Pdb``. |     ''' | ||||||
|     """ |     Add teardown hooks to the regular ``pdbpp.Pdb``. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|     # override the pdbpp config with our coolio one |     # override the pdbpp config with our coolio one | ||||||
|     DefaultConfig = TractorConfig |     DefaultConfig = TractorConfig | ||||||
| 
 | 
 | ||||||
|  | @ -95,17 +103,19 @@ class PdbwTeardown(pdbpp.Pdb): | ||||||
|         try: |         try: | ||||||
|             super().set_continue() |             super().set_continue() | ||||||
|         finally: |         finally: | ||||||
|             global _local_task_in_debug |             global _local_task_in_debug, _pdb_release_hook | ||||||
|             _local_task_in_debug = None |             _local_task_in_debug = None | ||||||
|             _pdb_release_hook() |             if _pdb_release_hook: | ||||||
|  |                 _pdb_release_hook() | ||||||
| 
 | 
 | ||||||
|     def set_quit(self): |     def set_quit(self): | ||||||
|         try: |         try: | ||||||
|             super().set_quit() |             super().set_quit() | ||||||
|         finally: |         finally: | ||||||
|             global _local_task_in_debug |             global _local_task_in_debug, _pdb_release_hook | ||||||
|             _local_task_in_debug = None |             _local_task_in_debug = None | ||||||
|             _pdb_release_hook() |             if _pdb_release_hook: | ||||||
|  |                 _pdb_release_hook() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: will be needed whenever we get to true remote debugging. | # TODO: will be needed whenever we get to true remote debugging. | ||||||
|  | @ -150,7 +160,8 @@ async def _acquire_debug_lock( | ||||||
|     uid: Tuple[str, str] |     uid: Tuple[str, str] | ||||||
| 
 | 
 | ||||||
| ) -> AsyncIterator[trio.StrictFIFOLock]: | ) -> AsyncIterator[trio.StrictFIFOLock]: | ||||||
|     '''Acquire a root-actor local FIFO lock which tracks mutex access of |     ''' | ||||||
|  |     Acquire a root-actor local FIFO lock which tracks mutex access of | ||||||
|     the process tree's global debugger breakpoint. |     the process tree's global debugger breakpoint. | ||||||
| 
 | 
 | ||||||
|     This lock avoids tty clobbering (by preventing multiple processes |     This lock avoids tty clobbering (by preventing multiple processes | ||||||
|  | @ -162,27 +173,27 @@ async def _acquire_debug_lock( | ||||||
| 
 | 
 | ||||||
|     task_name = trio.lowlevel.current_task().name |     task_name = trio.lowlevel.current_task().name | ||||||
| 
 | 
 | ||||||
|     log.debug( |     log.runtime( | ||||||
|         f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" |         f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     we_acquired = False |     we_acquired = False | ||||||
| 
 | 
 | ||||||
|     if _no_remote_has_tty is None: |  | ||||||
|         # mark the tty lock as being in use so that the runtime |  | ||||||
|         # can try to avoid clobbering any connection from a child |  | ||||||
|         # that's currently relying on it. |  | ||||||
|         _no_remote_has_tty = trio.Event() |  | ||||||
| 
 |  | ||||||
|     try: |     try: | ||||||
|         log.debug( |         log.runtime( | ||||||
|             f"entering lock checkpoint, remote task: {task_name}:{uid}" |             f"entering lock checkpoint, remote task: {task_name}:{uid}" | ||||||
|         ) |         ) | ||||||
|         we_acquired = True |         we_acquired = True | ||||||
|         await _debug_lock.acquire() |         await _debug_lock.acquire() | ||||||
| 
 | 
 | ||||||
|  |         if _no_remote_has_tty is None: | ||||||
|  |             # mark the tty lock as being in use so that the runtime | ||||||
|  |             # can try to avoid clobbering any connection from a child | ||||||
|  |             # that's currently relying on it. | ||||||
|  |             _no_remote_has_tty = trio.Event() | ||||||
|  | 
 | ||||||
|         _global_actor_in_debug = uid |         _global_actor_in_debug = uid | ||||||
|         log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") |         log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}") | ||||||
| 
 | 
 | ||||||
|         # NOTE: critical section: this yield is unshielded! |         # NOTE: critical section: this yield is unshielded! | ||||||
| 
 | 
 | ||||||
|  | @ -198,7 +209,11 @@ async def _acquire_debug_lock( | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
|         # if _global_actor_in_debug == uid: |         # if _global_actor_in_debug == uid: | ||||||
|         if we_acquired and _debug_lock.locked(): | 
 | ||||||
|  |         if ( | ||||||
|  |             we_acquired | ||||||
|  |             and _debug_lock.locked() | ||||||
|  |         ): | ||||||
|             _debug_lock.release() |             _debug_lock.release() | ||||||
| 
 | 
 | ||||||
|         # IFF there are no more requesting tasks queued up fire, the |         # IFF there are no more requesting tasks queued up fire, the | ||||||
|  | @ -210,28 +225,15 @@ async def _acquire_debug_lock( | ||||||
|         if ( |         if ( | ||||||
|             not stats.owner |             not stats.owner | ||||||
|         ): |         ): | ||||||
|             log.debug(f"No more tasks waiting on tty lock! says {uid}") |             log.runtime(f"No more tasks waiting on tty lock! says {uid}") | ||||||
|             _no_remote_has_tty.set() |             if _no_remote_has_tty is not None: | ||||||
|             _no_remote_has_tty = None |                 _no_remote_has_tty.set() | ||||||
|  |                 _no_remote_has_tty = None | ||||||
| 
 | 
 | ||||||
|         _global_actor_in_debug = None |         _global_actor_in_debug = None | ||||||
| 
 | 
 | ||||||
|         log.debug(f"TTY lock released, remote task: {task_name}:{uid}") |         log.runtime( | ||||||
| 
 |             f"TTY lock released, remote task: {task_name}:{uid}" | ||||||
| 
 |  | ||||||
| def handler(signum, frame, *args): |  | ||||||
|     """Specialized debugger compatible SIGINT handler. |  | ||||||
| 
 |  | ||||||
|     In childred we always ignore to avoid deadlocks since cancellation |  | ||||||
|     should always be managed by the parent supervising actor. The root |  | ||||||
|     is always cancelled on ctrl-c. |  | ||||||
|     """ |  | ||||||
|     if is_root_process(): |  | ||||||
|         tractor.current_actor().cancel_soon() |  | ||||||
|     else: |  | ||||||
|         print( |  | ||||||
|             "tractor ignores SIGINT while in debug mode\n" |  | ||||||
|             "If you have a special need for it please open an issue.\n" |  | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -247,6 +249,10 @@ async def _hijack_stdin_for_child( | ||||||
|     the pdbpp debugger console can be allocated to a sub-actor for repl |     the pdbpp debugger console can be allocated to a sub-actor for repl | ||||||
|     bossing. |     bossing. | ||||||
| 
 | 
 | ||||||
|  |     NOTE: this task is invoked in the root actor-process of the actor | ||||||
|  |     tree. It is meant to be invoked as an rpc-task which should be | ||||||
|  |     highly reliable at cleaning out the tty-lock state when complete! | ||||||
|  | 
 | ||||||
|     ''' |     ''' | ||||||
|     task_name = trio.lowlevel.current_task().name |     task_name = trio.lowlevel.current_task().name | ||||||
| 
 | 
 | ||||||
|  | @ -260,46 +266,65 @@ async def _hijack_stdin_for_child( | ||||||
| 
 | 
 | ||||||
|     log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") |     log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") | ||||||
| 
 | 
 | ||||||
|     with trio.CancelScope(shield=True): |     orig_handler = signal.signal( | ||||||
| 
 |         signal.SIGINT, | ||||||
|         try: |         shield_sigint, | ||||||
|             lock = None |     ) | ||||||
|             async with _acquire_debug_lock(subactor_uid) as lock: |     try: | ||||||
|  |         with ( | ||||||
|  |             trio.CancelScope(shield=True), | ||||||
|  |         ): | ||||||
|  |             # try: | ||||||
|  |             # lock = None | ||||||
|  |             async with _acquire_debug_lock(subactor_uid):  # as lock: | ||||||
| 
 | 
 | ||||||
|                 # indicate to child that we've locked stdio |                 # indicate to child that we've locked stdio | ||||||
|                 await ctx.started('Locked') |                 await ctx.started('Locked') | ||||||
|                 log.debug(f"Actor {subactor_uid} acquired stdin hijack lock") |                 log.debug( | ||||||
|  |                     f"Actor {subactor_uid} acquired stdin hijack lock" | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|                 # wait for unlock pdb by child |                 # wait for unlock pdb by child | ||||||
|                 async with ctx.open_stream() as stream: |                 async with ctx.open_stream() as stream: | ||||||
|                     assert await stream.receive() == 'pdb_unlock' |                     assert await stream.receive() == 'pdb_unlock' | ||||||
| 
 | 
 | ||||||
|                 # try: |             # except ( | ||||||
|                 #     assert await stream.receive() == 'pdb_unlock' |             #     BaseException, | ||||||
|  |             #     # trio.MultiError, | ||||||
|  |             #     # Exception, | ||||||
|  |             #     # trio.BrokenResourceError, | ||||||
|  |             #     # trio.Cancelled,  # by local cancellation | ||||||
|  |             #     # trio.ClosedResourceError,  # by self._rx_chan | ||||||
|  |             #     # ContextCancelled, | ||||||
|  |             #     # ConnectionResetError, | ||||||
|  |             # ): | ||||||
|  |             #     # XXX: there may be a race with the portal teardown | ||||||
|  |             #     # with the calling actor which we can safely ignore. | ||||||
|  |             #     # The alternative would be sending an ack message | ||||||
|  |             #     # and allowing the client to wait for us to teardown | ||||||
|  |             #     # first? | ||||||
|  |             #     if lock and lock.locked(): | ||||||
|  |             #         try: | ||||||
|  |             #             lock.release() | ||||||
|  |             #         except RuntimeError: | ||||||
|  |             #             log.exception(f"we don't own the tty lock?") | ||||||
| 
 | 
 | ||||||
|         except ( |             #     # if isinstance(err, trio.Cancelled): | ||||||
|             # BaseException, |             #     raise | ||||||
|             trio.MultiError, |  | ||||||
|             trio.BrokenResourceError, |  | ||||||
|             trio.Cancelled,  # by local cancellation |  | ||||||
|             trio.ClosedResourceError,  # by self._rx_chan |  | ||||||
|         ) as err: |  | ||||||
|             # XXX: there may be a race with the portal teardown |  | ||||||
|             # with the calling actor which we can safely ignore. |  | ||||||
|             # The alternative would be sending an ack message |  | ||||||
|             # and allowing the client to wait for us to teardown |  | ||||||
|             # first? |  | ||||||
|             if lock and lock.locked(): |  | ||||||
|                 lock.release() |  | ||||||
| 
 | 
 | ||||||
|             if isinstance(err, trio.Cancelled): |             # finally: | ||||||
|                 raise |             #     log.runtime( | ||||||
|         finally: |             #         "TTY lock released, remote task:" | ||||||
|             log.debug( |             #         f"{task_name}:{subactor_uid}" | ||||||
|                 "TTY lock released, remote task:" |             #     ) | ||||||
|                 f"{task_name}:{subactor_uid}") |  | ||||||
| 
 | 
 | ||||||
|     return "pdb_unlock_complete" |         return "pdb_unlock_complete" | ||||||
|  | 
 | ||||||
|  |     finally: | ||||||
|  |         signal.signal( | ||||||
|  |             signal.SIGINT, | ||||||
|  |             orig_handler | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def wait_for_parent_stdin_hijack( | async def wait_for_parent_stdin_hijack( | ||||||
|  | @ -338,20 +363,22 @@ async def wait_for_parent_stdin_hijack( | ||||||
| 
 | 
 | ||||||
|                     async with ctx.open_stream() as stream: |                     async with ctx.open_stream() as stream: | ||||||
|                         # unblock local caller |                         # unblock local caller | ||||||
|                         task_status.started(cs) |  | ||||||
| 
 | 
 | ||||||
|                         try: |                         try: | ||||||
|                             assert _local_pdb_complete |                             assert _local_pdb_complete | ||||||
|  |                             task_status.started(cs) | ||||||
|                             await _local_pdb_complete.wait() |                             await _local_pdb_complete.wait() | ||||||
| 
 | 
 | ||||||
|                         finally: |                         finally: | ||||||
|                             # TODO: shielding currently can cause hangs... |                             # TODO: shielding currently can cause hangs... | ||||||
|                             with trio.CancelScope(shield=True): |                             # with trio.CancelScope(shield=True): | ||||||
|                                 await stream.send('pdb_unlock') |                             await stream.send('pdb_unlock') | ||||||
| 
 | 
 | ||||||
|                         # sync with callee termination |                         # sync with callee termination | ||||||
|                         assert await ctx.result() == "pdb_unlock_complete" |                         assert await ctx.result() == "pdb_unlock_complete" | ||||||
| 
 | 
 | ||||||
|  |                 log.pdb('unlocked context') | ||||||
|  | 
 | ||||||
|         except tractor.ContextCancelled: |         except tractor.ContextCancelled: | ||||||
|             log.warning('Root actor cancelled debug lock') |             log.warning('Root actor cancelled debug lock') | ||||||
| 
 | 
 | ||||||
|  | @ -362,6 +389,32 @@ async def wait_for_parent_stdin_hijack( | ||||||
|             log.debug(f"Child {actor_uid} released parent stdio lock") |             log.debug(f"Child {actor_uid} released parent stdio lock") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def mk_mpdb() -> tuple[MultiActorPdb, Callable]: | ||||||
|  | 
 | ||||||
|  |     pdb = MultiActorPdb() | ||||||
|  |     signal.signal = pdbpp.hideframe(signal.signal) | ||||||
|  |     orig_handler = signal.signal( | ||||||
|  |         signal.SIGINT, | ||||||
|  |         partial(shield_sigint, pdb_obj=pdb), | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     # XXX: These are the important flags mentioned in | ||||||
|  |     # https://github.com/python-trio/trio/issues/1155 | ||||||
|  |     # which resolve the traceback spews to console. | ||||||
|  |     pdb.allow_kbdint = True | ||||||
|  |     pdb.nosigint = True | ||||||
|  | 
 | ||||||
|  |     # TODO: add this as method on our pdb obj? | ||||||
|  |     def undo_sigint(): | ||||||
|  |         # restore original sigint handler | ||||||
|  |         signal.signal( | ||||||
|  |             signal.SIGINT, | ||||||
|  |             orig_handler | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     return pdb, undo_sigint | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| async def _breakpoint( | async def _breakpoint( | ||||||
| 
 | 
 | ||||||
|     debug_func, |     debug_func, | ||||||
|  | @ -370,23 +423,26 @@ async def _breakpoint( | ||||||
|     # shield: bool = False |     # shield: bool = False | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     '''``tractor`` breakpoint entry for engaging pdb machinery |     ''' | ||||||
|     in the root or a subactor. |     breakpoint entry for engaging pdb machinery in the root or | ||||||
|  |     a subactor. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     # TODO: is it possible to debug a trio.Cancelled except block? |     __tracebackhide__ = True | ||||||
|     # right now it seems like we can kinda do with by shielding |  | ||||||
|     # around ``tractor.breakpoint()`` but not if we move the shielded |  | ||||||
|     # scope here??? |  | ||||||
|     # with trio.CancelScope(shield=shield): |  | ||||||
| 
 | 
 | ||||||
|  |     pdb, undo_sigint = mk_mpdb() | ||||||
|     actor = tractor.current_actor() |     actor = tractor.current_actor() | ||||||
|     task_name = trio.lowlevel.current_task().name |     task_name = trio.lowlevel.current_task().name | ||||||
| 
 | 
 | ||||||
|     global _local_pdb_complete, _pdb_release_hook |     global _local_pdb_complete, _pdb_release_hook | ||||||
|     global _local_task_in_debug, _global_actor_in_debug |     global _local_task_in_debug, _global_actor_in_debug | ||||||
| 
 | 
 | ||||||
|     await trio.lowlevel.checkpoint() |     # TODO: is it possible to debug a trio.Cancelled except block? | ||||||
|  |     # right now it seems like we can kinda do with by shielding | ||||||
|  |     # around ``tractor.breakpoint()`` but not if we move the shielded | ||||||
|  |     # scope here??? | ||||||
|  |     # with trio.CancelScope(shield=shield): | ||||||
|  |     #     await trio.lowlevel.checkpoint() | ||||||
| 
 | 
 | ||||||
|     if not _local_pdb_complete or _local_pdb_complete.is_set(): |     if not _local_pdb_complete or _local_pdb_complete.is_set(): | ||||||
|         _local_pdb_complete = trio.Event() |         _local_pdb_complete = trio.Event() | ||||||
|  | @ -412,8 +468,20 @@ async def _breakpoint( | ||||||
|         # entries/requests to the root process |         # entries/requests to the root process | ||||||
|         _local_task_in_debug = task_name |         _local_task_in_debug = task_name | ||||||
| 
 | 
 | ||||||
|  |         def child_release_hook(): | ||||||
|  |             try: | ||||||
|  |                 # sometimes the ``trio`` might already be termianated in | ||||||
|  |                 # which case this call will raise. | ||||||
|  |                 _local_pdb_complete.set() | ||||||
|  |             finally: | ||||||
|  |                 # restore original sigint handler | ||||||
|  |                 undo_sigint() | ||||||
|  |                 # should always be cleared in the hijack hook aboved right? | ||||||
|  |                 # _local_task_in_debug = None | ||||||
|  | 
 | ||||||
|         # assign unlock callback for debugger teardown hooks |         # assign unlock callback for debugger teardown hooks | ||||||
|         _pdb_release_hook = _local_pdb_complete.set |         # _pdb_release_hook = _local_pdb_complete.set | ||||||
|  |         _pdb_release_hook = child_release_hook | ||||||
| 
 | 
 | ||||||
|         # this **must** be awaited by the caller and is done using the |         # this **must** be awaited by the caller and is done using the | ||||||
|         # root nursery so that the debugger can continue to run without |         # root nursery so that the debugger can continue to run without | ||||||
|  | @ -423,11 +491,15 @@ async def _breakpoint( | ||||||
|         # we have to figure out how to avoid having the service nursery |         # we have to figure out how to avoid having the service nursery | ||||||
|         # cancel on this task start? I *think* this works below? |         # cancel on this task start? I *think* this works below? | ||||||
|         # actor._service_n.cancel_scope.shield = shield |         # actor._service_n.cancel_scope.shield = shield | ||||||
|         with trio.CancelScope(shield=True): |         try: | ||||||
|             await actor._service_n.start( |             with trio.CancelScope(shield=True): | ||||||
|                 wait_for_parent_stdin_hijack, |                 await actor._service_n.start( | ||||||
|                 actor.uid, |                     wait_for_parent_stdin_hijack, | ||||||
|             ) |                     actor.uid, | ||||||
|  |                 ) | ||||||
|  |         except RuntimeError: | ||||||
|  |             child_release_hook() | ||||||
|  |             raise | ||||||
| 
 | 
 | ||||||
|     elif is_root_process(): |     elif is_root_process(): | ||||||
| 
 | 
 | ||||||
|  | @ -464,59 +536,221 @@ async def _breakpoint( | ||||||
|             global _local_pdb_complete, _debug_lock |             global _local_pdb_complete, _debug_lock | ||||||
|             global _global_actor_in_debug, _local_task_in_debug |             global _global_actor_in_debug, _local_task_in_debug | ||||||
| 
 | 
 | ||||||
|             _debug_lock.release() |             try: | ||||||
|  |                 _debug_lock.release() | ||||||
|  |             except RuntimeError: | ||||||
|  |                 # uhhh makes no sense but been seeing the non-owner | ||||||
|  |                 # release error even though this is definitely the task | ||||||
|  |                 # that locked? | ||||||
|  |                 owner = _debug_lock.statistics().owner | ||||||
|  |                 if owner: | ||||||
|  |                     raise | ||||||
|  | 
 | ||||||
|             _global_actor_in_debug = None |             _global_actor_in_debug = None | ||||||
|             _local_task_in_debug = None |             _local_task_in_debug = None | ||||||
|             _local_pdb_complete.set() | 
 | ||||||
|  |             try: | ||||||
|  |                 # sometimes the ``trio`` might already be termianated in | ||||||
|  |                 # which case this call will raise. | ||||||
|  |                 _local_pdb_complete.set() | ||||||
|  |             finally: | ||||||
|  |                 # restore original sigint handler | ||||||
|  |                 undo_sigint() | ||||||
| 
 | 
 | ||||||
|         _pdb_release_hook = teardown |         _pdb_release_hook = teardown | ||||||
| 
 | 
 | ||||||
|     # block here one (at the appropriate frame *up*) where |     # frame = sys._getframe() | ||||||
|     # ``breakpoint()`` was awaited and begin handling stdio. |     # last_f = frame.f_back | ||||||
|     log.debug("Entering the synchronous world of pdb") |     # last_f.f_globals['__tracebackhide__'] = True | ||||||
|     debug_func(actor) | 
 | ||||||
|  |     try: | ||||||
|  |         # block here one (at the appropriate frame *up*) where | ||||||
|  |         # ``breakpoint()`` was awaited and begin handling stdio. | ||||||
|  |         log.debug("Entering the synchronous world of pdb") | ||||||
|  |         debug_func(actor, pdb) | ||||||
|  | 
 | ||||||
|  |     except bdb.BdbQuit: | ||||||
|  |         if _pdb_release_hook: | ||||||
|  |             _pdb_release_hook() | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|  |     # XXX: apparently we can't do this without showing this frame | ||||||
|  |     # in the backtrace on first entry to the REPL? Seems like an odd | ||||||
|  |     # behaviour that should have been fixed by now. This is also why | ||||||
|  |     # we scrapped all the @cm approaches that were tried previously. | ||||||
|  | 
 | ||||||
|  |     # finally: | ||||||
|  |     #     __tracebackhide__ = True | ||||||
|  |     #     # frame = sys._getframe() | ||||||
|  |     #     # last_f = frame.f_back | ||||||
|  |     #     # last_f.f_globals['__tracebackhide__'] = True | ||||||
|  |     #     # signal.signal = pdbpp.hideframe(signal.signal) | ||||||
|  |     #     signal.signal( | ||||||
|  |     #         signal.SIGINT, | ||||||
|  |     #         orig_handler | ||||||
|  |     #     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _mk_pdb() -> PdbwTeardown: | def shield_sigint( | ||||||
|  |     signum: int, | ||||||
|  |     frame: 'frame',  # type: ignore # noqa | ||||||
|  |     pdb_obj: Optional[MultiActorPdb] = None, | ||||||
|  |     *args, | ||||||
| 
 | 
 | ||||||
|     # XXX: setting these flags on the pdb instance are absolutely | ) -> None: | ||||||
|     # critical to having ctrl-c work in the ``trio`` standard way!  The |     ''' | ||||||
|     # stdlib's pdb supports entering the current sync frame on a SIGINT, |     Specialized debugger compatible SIGINT handler. | ||||||
|     # with ``trio`` we pretty much never want this and if we did we can |  | ||||||
|     # handle it in the ``tractor`` task runtime. |  | ||||||
| 
 | 
 | ||||||
|     pdb = PdbwTeardown() |     In childred we always ignore to avoid deadlocks since cancellation | ||||||
|     pdb.allow_kbdint = True |     should always be managed by the parent supervising actor. The root | ||||||
|     pdb.nosigint = True |     is always cancelled on ctrl-c. | ||||||
| 
 | 
 | ||||||
|     return pdb |     ''' | ||||||
|  |     __tracebackhide__ = True | ||||||
|  | 
 | ||||||
|  |     global _local_task_in_debug, _global_actor_in_debug | ||||||
|  |     uid_in_debug = _global_actor_in_debug | ||||||
|  | 
 | ||||||
|  |     actor = tractor.current_actor() | ||||||
|  | 
 | ||||||
|  |     def do_cancel(): | ||||||
|  |         # If we haven't tried to cancel the runtime then do that instead | ||||||
|  |         # of raising a KBI (which may non-gracefully destroy | ||||||
|  |         # a ``trio.run()``). | ||||||
|  |         if not actor._cancel_called: | ||||||
|  |             actor.cancel_soon() | ||||||
|  | 
 | ||||||
|  |         # If the runtime is already cancelled it likely means the user | ||||||
|  |         # hit ctrl-c again because teardown didn't full take place in | ||||||
|  |         # which case we do the "hard" raising of a local KBI. | ||||||
|  |         else: | ||||||
|  |             raise KeyboardInterrupt | ||||||
|  | 
 | ||||||
|  |     any_connected = False | ||||||
|  | 
 | ||||||
|  |     if uid_in_debug is not None: | ||||||
|  |         # try to see if the supposed (sub)actor in debug still | ||||||
|  |         # has an active connection to *this* actor, and if not | ||||||
|  |         # it's likely they aren't using the TTY lock / debugger | ||||||
|  |         # and we should propagate SIGINT normally. | ||||||
|  |         chans = actor._peers.get(tuple(uid_in_debug)) | ||||||
|  |         if chans: | ||||||
|  |             any_connected = any(chan.connected() for chan in chans) | ||||||
|  |             if not any_connected: | ||||||
|  |                 log.warning( | ||||||
|  |                     'A global actor reported to be in debug ' | ||||||
|  |                     'but no connection exists for this child:\n' | ||||||
|  |                     f'{uid_in_debug}\n' | ||||||
|  |                     'Allowing SIGINT propagation..' | ||||||
|  |                 ) | ||||||
|  |                 return do_cancel() | ||||||
|  | 
 | ||||||
|  |     # root actor branch that reports whether or not a child | ||||||
|  |     # has locked debugger. | ||||||
|  |     if ( | ||||||
|  |         is_root_process() | ||||||
|  |         and uid_in_debug is not None | ||||||
|  | 
 | ||||||
|  |         # XXX: only if there is an existing connection to the | ||||||
|  |         # (sub-)actor in debug do we ignore SIGINT in this | ||||||
|  |         # parent! Otherwise we may hang waiting for an actor | ||||||
|  |         # which has already terminated to unlock. | ||||||
|  |         and any_connected | ||||||
|  |     ): | ||||||
|  |         name = uid_in_debug[0] | ||||||
|  |         if name != 'root': | ||||||
|  |             log.pdb( | ||||||
|  |                 f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`" | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             log.pdb( | ||||||
|  |                 "Ignoring SIGINT while in debug mode" | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |     # child actor that has locked the debugger | ||||||
|  |     elif ( | ||||||
|  |         not is_root_process() | ||||||
|  |     ): | ||||||
|  |         chan: Channel = actor._parent_chan | ||||||
|  |         if not chan or not chan.connected(): | ||||||
|  |             log.warning( | ||||||
|  |                 'A global actor reported to be in debug ' | ||||||
|  |                 'but no connection exists for its parent:\n' | ||||||
|  |                 f'{uid_in_debug}\n' | ||||||
|  |                 'Allowing SIGINT propagation..' | ||||||
|  |             ) | ||||||
|  |             return do_cancel() | ||||||
|  | 
 | ||||||
|  |         task = _local_task_in_debug | ||||||
|  |         if task: | ||||||
|  |             log.pdb( | ||||||
|  |                 f"Ignoring SIGINT while task in debug mode: `{task}`" | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         # TODO: how to handle the case of an intermediary-child actor | ||||||
|  |         # that **is not** marked in debug mode? | ||||||
|  |         # elif debug_mode(): | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             log.pdb( | ||||||
|  |                 "Ignoring SIGINT since debug mode is enabled" | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |     # maybe redraw/print last REPL output to console | ||||||
|  |     if pdb_obj: | ||||||
|  | 
 | ||||||
|  |         # TODO: make this work like sticky mode where if there is output | ||||||
|  |         # detected as written to the tty we redraw this part underneath | ||||||
|  |         # and erase the past draw of this same bit above? | ||||||
|  |         # pdb_obj.sticky = True | ||||||
|  |         # pdb_obj._print_if_sticky() | ||||||
|  | 
 | ||||||
|  |         # also see these links for an approach from ``ptk``: | ||||||
|  |         # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 | ||||||
|  |         # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             # XXX: lol, see ``pdbpp`` issue: | ||||||
|  |             # https://github.com/pdbpp/pdbpp/issues/496 | ||||||
|  |             # pdb_obj.do_longlist(None) | ||||||
|  |             # pdb_obj.lastcmd = 'longlist' | ||||||
|  |             pdb_obj._printlonglist(max_lines=False) | ||||||
|  |             # print(pdb_obj.prompt, end='', flush=True) | ||||||
|  | 
 | ||||||
|  |         except AttributeError: | ||||||
|  |             log.exception('pdbpp longlist failed...') | ||||||
|  |             raise KeyboardInterrupt | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _set_trace(actor=None): | def _set_trace( | ||||||
|     pdb = _mk_pdb() |     actor: Optional[tractor._actor.Actor] = None, | ||||||
|  |     pdb: Optional[MultiActorPdb] = None, | ||||||
|  | ): | ||||||
|  |     __tracebackhide__ = True | ||||||
|  |     actor = actor or tractor.current_actor() | ||||||
| 
 | 
 | ||||||
|     if actor is not None: |     # XXX: on latest ``pdbpp`` i guess we don't need this? | ||||||
|  |     # frame = sys._getframe() | ||||||
|  |     # last_f = frame.f_back | ||||||
|  |     # last_f.f_globals['__tracebackhide__'] = True | ||||||
|  | 
 | ||||||
|  |     # start 2 levels up in user code | ||||||
|  |     frame: FrameType = sys._getframe() | ||||||
|  |     if frame: | ||||||
|  |         frame = frame.f_back  # type: ignore | ||||||
|  | 
 | ||||||
|  |     if pdb and actor is not None: | ||||||
|         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") |         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||||
| 
 | 
 | ||||||
|         pdb.set_trace( |  | ||||||
|             # start 2 levels up in user code |  | ||||||
|             frame=sys._getframe().f_back.f_back, |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     else: |     else: | ||||||
|         # we entered the global ``breakpoint()`` built-in from sync code |         pdb, undo_sigint = mk_mpdb() | ||||||
|  | 
 | ||||||
|  |         # we entered the global ``breakpoint()`` built-in from sync code? | ||||||
|         global _local_task_in_debug, _pdb_release_hook |         global _local_task_in_debug, _pdb_release_hook | ||||||
|         _local_task_in_debug = 'sync' |         _local_task_in_debug = 'sync' | ||||||
| 
 | 
 | ||||||
|         def nuttin(): |     pdb.set_trace(frame=frame) | ||||||
|             pass |  | ||||||
| 
 |  | ||||||
|         _pdb_release_hook = nuttin |  | ||||||
| 
 |  | ||||||
|         pdb.set_trace( |  | ||||||
|             # start 2 levels up in user code |  | ||||||
|             frame=sys._getframe().f_back, |  | ||||||
|         ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| breakpoint = partial( | breakpoint = partial( | ||||||
|  | @ -525,11 +759,40 @@ breakpoint = partial( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _post_mortem(actor): | def _post_mortem( | ||||||
|     log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") |     actor: tractor._actor.Actor, | ||||||
|     pdb = _mk_pdb() |     pdb: MultiActorPdb, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  |     ''' | ||||||
|  |     Enter the ``pdbpp`` port mortem entrypoint using our custom | ||||||
|  |     debugger instance. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") | ||||||
|  | 
 | ||||||
|  |     # XXX: on py3.10 if you don't have latest ``pdbpp`` installed. | ||||||
|  |     # The exception looks something like: | ||||||
|  |     # Traceback (most recent call last): | ||||||
|  |     # File ".../tractor/_debug.py", line 729, in _post_mortem | ||||||
|  |     #   for _ in range(100): | ||||||
|  |     # File "../site-packages/pdb.py", line 1227, in xpm | ||||||
|  |     #   post_mortem(info[2], Pdb) | ||||||
|  |     # File "../site-packages/pdb.py", line 1175, in post_mortem | ||||||
|  |     #   p.interaction(None, t) | ||||||
|  |     # File "../site-packages/pdb.py", line 216, in interaction | ||||||
|  |     #   ret = self.setup(frame, traceback) | ||||||
|  |     # File "../site-packages/pdb.py", line 259, in setup | ||||||
|  |     #   ret = super(Pdb, self).setup(frame, tb) | ||||||
|  |     # File "/usr/lib/python3.10/pdb.py", line 217, in setup | ||||||
|  |     #   self.curframe = self.stack[self.curindex][0] | ||||||
|  |     # IndexError: list index out of range | ||||||
|  | 
 | ||||||
|  |     # NOTE: you need ``pdbpp`` master (at least this commit | ||||||
|  |     # https://github.com/pdbpp/pdbpp/commit/b757794857f98d53e3ebbe70879663d7d843a6c2) | ||||||
|  |     # to fix this and avoid the hang it causes XD. | ||||||
|  |     # see also: https://github.com/pdbpp/pdbpp/issues/480 | ||||||
| 
 | 
 | ||||||
|     # custom Pdb post-mortem entry |  | ||||||
|     pdbpp.xpm(Pdb=lambda: pdb) |     pdbpp.xpm(Pdb=lambda: pdb) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -231,7 +231,7 @@ class MsgspecTCPStream(MsgpackTCPStream): | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         import msgspec  # noqa |         import msgspec  # noqa | ||||||
|         last_decode_failed: bool = False |         decodes_failed: int = 0 | ||||||
| 
 | 
 | ||||||
|         while True: |         while True: | ||||||
|             try: |             try: | ||||||
|  | @ -239,6 +239,7 @@ class MsgspecTCPStream(MsgpackTCPStream): | ||||||
| 
 | 
 | ||||||
|             except ( |             except ( | ||||||
|                 ValueError, |                 ValueError, | ||||||
|  |                 ConnectionResetError, | ||||||
| 
 | 
 | ||||||
|                 # not sure entirely why we need this but without it we |                 # not sure entirely why we need this but without it we | ||||||
|                 # seem to be getting racy failures here on |                 # seem to be getting racy failures here on | ||||||
|  | @ -267,12 +268,17 @@ class MsgspecTCPStream(MsgpackTCPStream): | ||||||
|                 msgspec.DecodeError, |                 msgspec.DecodeError, | ||||||
|                 UnicodeDecodeError, |                 UnicodeDecodeError, | ||||||
|             ): |             ): | ||||||
|                 if not last_decode_failed: |                 if decodes_failed < 4: | ||||||
|                     # ignore decoding errors for now and assume they have to |                     # ignore decoding errors for now and assume they have to | ||||||
|                     # do with a channel drop - hope that receiving from the |                     # do with a channel drop - hope that receiving from the | ||||||
|                     # channel will raise an expected error and bubble up. |                     # channel will raise an expected error and bubble up. | ||||||
|                     log.error('`msgspec` failed to decode!?') |                     decoded_bytes = msg_bytes.decode() | ||||||
|                     last_decode_failed = True |                     log.error( | ||||||
|  |                         '`msgspec` failed to decode!?\n' | ||||||
|  |                         'dumping bytes:\n' | ||||||
|  |                         f'{decoded_bytes}' | ||||||
|  |                     ) | ||||||
|  |                     decodes_failed += 1 | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -24,7 +24,8 @@ import importlib | ||||||
| import inspect | import inspect | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, Optional, |     Any, Optional, | ||||||
|     Callable, AsyncGenerator |     Callable, AsyncGenerator, | ||||||
|  |     Type, | ||||||
| ) | ) | ||||||
| from functools import partial | from functools import partial | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
|  | @ -442,6 +443,10 @@ class Portal: | ||||||
|         _err: Optional[BaseException] = None |         _err: Optional[BaseException] = None | ||||||
|         ctx._portal = self |         ctx._portal = self | ||||||
| 
 | 
 | ||||||
|  |         uid = self.channel.uid | ||||||
|  |         cid = ctx.cid | ||||||
|  |         etype: Optional[Type[BaseException]] = None | ||||||
|  | 
 | ||||||
|         # deliver context instance and .started() msg value in open tuple. |         # deliver context instance and .started() msg value in open tuple. | ||||||
|         try: |         try: | ||||||
|             async with trio.open_nursery() as scope_nursery: |             async with trio.open_nursery() as scope_nursery: | ||||||
|  | @ -477,13 +482,24 @@ class Portal: | ||||||
|             # KeyboardInterrupt, |             # KeyboardInterrupt, | ||||||
| 
 | 
 | ||||||
|         ) as err: |         ) as err: | ||||||
|             _err = err |             etype = type(err) | ||||||
|             # the context cancels itself on any cancel |             # the context cancels itself on any cancel | ||||||
|             # causing error. |             # causing error. | ||||||
|             log.cancel( |  | ||||||
|                 f'Context to {self.channel.uid} sending cancel request..') |  | ||||||
| 
 | 
 | ||||||
|             await ctx.cancel() |             if ctx.chan.connected(): | ||||||
|  |                 log.cancel( | ||||||
|  |                     'Context cancelled for task, sending cancel request..\n' | ||||||
|  |                     f'task:{cid}\n' | ||||||
|  |                     f'actor:{uid}' | ||||||
|  |                 ) | ||||||
|  |                 await ctx.cancel() | ||||||
|  |             else: | ||||||
|  |                 log.warning( | ||||||
|  |                     'IPC connection for context is broken?\n' | ||||||
|  |                     f'task:{cid}\n' | ||||||
|  |                     f'actor:{uid}' | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|             raise |             raise | ||||||
| 
 | 
 | ||||||
|         finally: |         finally: | ||||||
|  | @ -492,7 +508,13 @@ class Portal: | ||||||
|             # sure we get the error the underlying feeder mem chan. |             # sure we get the error the underlying feeder mem chan. | ||||||
|             # if it's not raised here it *should* be raised from the |             # if it's not raised here it *should* be raised from the | ||||||
|             # msg loop nursery right? |             # msg loop nursery right? | ||||||
|             result = await ctx.result() |             if ctx.chan.connected(): | ||||||
|  |                 log.info( | ||||||
|  |                     'Waiting on final context-task result for\n' | ||||||
|  |                     f'task: {cid}\n' | ||||||
|  |                     f'actor: {uid}' | ||||||
|  |                 ) | ||||||
|  |                 result = await ctx.result() | ||||||
| 
 | 
 | ||||||
|             # though it should be impossible for any tasks |             # though it should be impossible for any tasks | ||||||
|             # operating *in* this scope to have survived |             # operating *in* this scope to have survived | ||||||
|  | @ -502,14 +524,17 @@ class Portal: | ||||||
|                 # should we encapsulate this in the context api? |                 # should we encapsulate this in the context api? | ||||||
|                 await ctx._recv_chan.aclose() |                 await ctx._recv_chan.aclose() | ||||||
| 
 | 
 | ||||||
|             if _err: |             if etype: | ||||||
|                 if ctx._cancel_called: |                 if ctx._cancel_called: | ||||||
|                     log.cancel( |                     log.cancel( | ||||||
|                         f'Context {fn_name} cancelled by caller with\n{_err}' |                         f'Context {fn_name} cancelled by caller with\n{etype}' | ||||||
|                     ) |                     ) | ||||||
|                 elif _err is not None: |                 elif _err is not None: | ||||||
|                     log.cancel( |                     log.cancel( | ||||||
|                         f'Context {fn_name} cancelled by callee with\n{_err}' |                         f'Context for task cancelled by callee with {etype}\n' | ||||||
|  |                         f'target: `{fn_name}`\n' | ||||||
|  |                         f'task:{cid}\n' | ||||||
|  |                         f'actor:{uid}' | ||||||
|                     ) |                     ) | ||||||
|             else: |             else: | ||||||
|                 log.runtime( |                 log.runtime( | ||||||
|  |  | ||||||
|  | @ -601,10 +601,18 @@ class Context: | ||||||
| 
 | 
 | ||||||
|             finally: |             finally: | ||||||
|                 if self._portal: |                 if self._portal: | ||||||
|                     self._portal._streams.remove(rchan) |                     try: | ||||||
|  |                         self._portal._streams.remove(stream) | ||||||
|  |                     except KeyError: | ||||||
|  |                         log.warning( | ||||||
|  |                             f'Stream was already destroyed?\n' | ||||||
|  |                             f'actor: {self.chan.uid}\n' | ||||||
|  |                             f'ctx id: {self.cid}' | ||||||
|  |                         ) | ||||||
| 
 | 
 | ||||||
|     async def result(self) -> Any: |     async def result(self) -> Any: | ||||||
|         '''From a caller side, wait for and return the final result from |         ''' | ||||||
|  |         From a caller side, wait for and return the final result from | ||||||
|         the callee side task. |         the callee side task. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|  |  | ||||||
|  | @ -259,6 +259,7 @@ def _run_asyncio_task( | ||||||
|         nonlocal chan |         nonlocal chan | ||||||
|         aio_err = chan._aio_err |         aio_err = chan._aio_err | ||||||
|         task_err: Optional[BaseException] = None |         task_err: Optional[BaseException] = None | ||||||
|  |         tname = task.get_name() | ||||||
| 
 | 
 | ||||||
|         # only to avoid ``asyncio`` complaining about uncaptured |         # only to avoid ``asyncio`` complaining about uncaptured | ||||||
|         # task exceptions |         # task exceptions | ||||||
|  | @ -266,7 +267,13 @@ def _run_asyncio_task( | ||||||
|             task.exception() |             task.exception() | ||||||
|         except BaseException as terr: |         except BaseException as terr: | ||||||
|             task_err = terr |             task_err = terr | ||||||
|             log.exception(f'`asyncio` task: {task.get_name()} errored') | 
 | ||||||
|  |             if isinstance(terr, CancelledError): | ||||||
|  |                 log.cancel( | ||||||
|  |                     f'infected `asyncio` task cancelled: {tname}') | ||||||
|  |             else: | ||||||
|  |                 log.exception(f'`asyncio` task: {tname} errored') | ||||||
|  | 
 | ||||||
|             assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' |             assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' | ||||||
| 
 | 
 | ||||||
|         if aio_err is not None: |         if aio_err is not None: | ||||||
|  | @ -280,21 +287,24 @@ def _run_asyncio_task( | ||||||
|             # We might want to change this in the future though. |             # We might want to change this in the future though. | ||||||
|             from_aio.close() |             from_aio.close() | ||||||
| 
 | 
 | ||||||
|             if type(aio_err) is CancelledError: |             # if type(aio_err) is CancelledError: | ||||||
|                 log.cancel("infected task was cancelled") |             #     if not task_err: | ||||||
|  |             #         log.cancel( | ||||||
|  |             #             f"infected task {tname} cancelled itself, was not ``trio``" | ||||||
|  |             #         ) | ||||||
| 
 | 
 | ||||||
|                 # TODO: show that the cancellation originated |                 # TODO: show that the cancellation originated | ||||||
|                 # from the ``trio`` side? right? |                 # from the ``trio`` side? right? | ||||||
|                 # if cancel_scope.cancelled: |                 # if cancel_scope.cancelled: | ||||||
|                 #     raise aio_err from err |                 #     raise aio_err from err | ||||||
| 
 | 
 | ||||||
|             elif task_err is None: |             if task_err is None: | ||||||
|                 assert aio_err |                 assert aio_err | ||||||
|                 aio_err.with_traceback(aio_err.__traceback__) |                 aio_err.with_traceback(aio_err.__traceback__) | ||||||
|                 msg = ''.join(traceback.format_exception(type(aio_err))) |                 # msg = ''.join(traceback.format_exception(type(aio_err))) | ||||||
|                 log.error( |                 # log.error( | ||||||
|                     f'infected task errorred:\n{msg}' |                 #     f'infected task errorred:\n{msg}' | ||||||
|                 ) |                 # ) | ||||||
| 
 | 
 | ||||||
|             # raise any ``asyncio`` side error. |             # raise any ``asyncio`` side error. | ||||||
|             raise aio_err |             raise aio_err | ||||||
|  | @ -387,8 +397,8 @@ async def run_task( | ||||||
| 
 | 
 | ||||||
| ) -> Any: | ) -> Any: | ||||||
|     ''' |     ''' | ||||||
|     Run an ``asyncio`` async function or generator in a task, return |     Run an ``asyncio`` async function or generator in a new task, block | ||||||
|     or stream the result back to ``trio``. |     and return the result back to ``trio``. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     # simple async func |     # simple async func | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue