Compare commits
	
		
			No commits in common. "master" and "deprecate_arbiter_addr" have entirely different histories. 
		
	
	
		
			master
			...
			deprecate_
		
	
		|  | @ -6,14 +6,8 @@ | ||||||
| ``tractor`` is a `structured concurrent`_, multi-processing_ runtime | ``tractor`` is a `structured concurrent`_, multi-processing_ runtime | ||||||
| built on trio_. | built on trio_. | ||||||
| 
 | 
 | ||||||
| Fundamentally, ``tractor`` gives you parallelism via | Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": | ||||||
| ``trio``-"*actors*": independent Python processes (aka | our nurseries_ let you spawn new Python processes which each run a ``trio`` | ||||||
| non-shared-memory threads) which maintain structured |  | ||||||
| concurrency (SC) *end-to-end* inside a *supervision tree*. |  | ||||||
| 
 |  | ||||||
| Cross-process (and thus cross-host) SC is accomplished through the |  | ||||||
| combined use of our "actor nurseries_" and an "SC-transitive IPC |  | ||||||
| protocol" constructed on top of multiple Pythons each running a ``trio`` |  | ||||||
| scheduled runtime - a call to ``trio.run()``. | scheduled runtime - a call to ``trio.run()``. | ||||||
| 
 | 
 | ||||||
| We believe the system adheres to the `3 axioms`_ of an "`actor model`_" | We believe the system adheres to the `3 axioms`_ of an "`actor model`_" | ||||||
|  | @ -29,8 +23,7 @@ Features | ||||||
| - **It's just** a ``trio`` API | - **It's just** a ``trio`` API | ||||||
| - *Infinitely nesteable* process trees | - *Infinitely nesteable* process trees | ||||||
| - Builtin IPC streaming APIs with task fan-out broadcasting | - Builtin IPC streaming APIs with task fan-out broadcasting | ||||||
| - A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of | - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ | ||||||
|   `pdb++`_ thanks to @mdmintz!) |  | ||||||
| - Support for a swappable, OS specific, process spawning layer | - Support for a swappable, OS specific, process spawning layer | ||||||
| - A modular transport stack, allowing for custom serialization (eg. with | - A modular transport stack, allowing for custom serialization (eg. with | ||||||
|   `msgspec`_), communications protocols, and environment specific IPC |   `msgspec`_), communications protocols, and environment specific IPC | ||||||
|  | @ -156,7 +149,7 @@ it **is a bug**. | ||||||
| 
 | 
 | ||||||
| "Native" multi-process debugging | "Native" multi-process debugging | ||||||
| -------------------------------- | -------------------------------- | ||||||
| Using the magic of `pdbp`_ and our internal IPC, we've | Using the magic of `pdb++`_ and our internal IPC, we've | ||||||
| been able to create a native feeling debugging experience for | been able to create a native feeling debugging experience for | ||||||
| any (sub-)process in your ``tractor`` tree. | any (sub-)process in your ``tractor`` tree. | ||||||
| 
 | 
 | ||||||
|  | @ -604,7 +597,6 @@ channel`_! | ||||||
| .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s | .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s | ||||||
| .. _trio gitter channel: https://gitter.im/python-trio/general | .. _trio gitter channel: https://gitter.im/python-trio/general | ||||||
| .. _matrix channel: https://matrix.to/#/!tractor:matrix.org | .. _matrix channel: https://matrix.to/#/!tractor:matrix.org | ||||||
| .. _pdbp: https://github.com/mdmintz/pdbp |  | ||||||
| .. _pdb++: https://github.com/pdbpp/pdbpp | .. _pdb++: https://github.com/pdbpp/pdbpp | ||||||
| .. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops | .. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops | ||||||
| .. _messages: https://en.wikipedia.org/wiki/Message_passing | .. _messages: https://en.wikipedia.org/wiki/Message_passing | ||||||
|  |  | ||||||
|  | @ -1,151 +0,0 @@ | ||||||
| ''' |  | ||||||
| Complex edge case where during real-time streaming the IPC tranport |  | ||||||
| channels are wiped out (purposely in this example though it could have |  | ||||||
| been an outage) and we want to ensure that despite being in debug mode |  | ||||||
| (or not) the user can sent SIGINT once they notice the hang and the |  | ||||||
| actor tree will eventually be cancelled without leaving any zombies. |  | ||||||
| 
 |  | ||||||
| ''' |  | ||||||
| import trio |  | ||||||
| from tractor import ( |  | ||||||
|     open_nursery, |  | ||||||
|     context, |  | ||||||
|     Context, |  | ||||||
|     MsgStream, |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def break_channel_silently_then_error( |  | ||||||
|     stream: MsgStream, |  | ||||||
| ): |  | ||||||
|     async for msg in stream: |  | ||||||
|         await stream.send(msg) |  | ||||||
| 
 |  | ||||||
|         # XXX: close the channel right after an error is raised |  | ||||||
|         # purposely breaking the IPC transport to make sure the parent |  | ||||||
|         # doesn't get stuck in debug or hang on the connection join. |  | ||||||
|         # this more or less simulates an infinite msg-receive hang on |  | ||||||
|         # the other end. |  | ||||||
|         await stream._ctx.chan.send(None) |  | ||||||
|         assert 0 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def close_stream_and_error( |  | ||||||
|     stream: MsgStream, |  | ||||||
| ): |  | ||||||
|     async for msg in stream: |  | ||||||
|         await stream.send(msg) |  | ||||||
| 
 |  | ||||||
|         # wipe out channel right before raising |  | ||||||
|         await stream._ctx.chan.send(None) |  | ||||||
|         await stream.aclose() |  | ||||||
|         assert 0 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @context |  | ||||||
| async def recv_and_spawn_net_killers( |  | ||||||
| 
 |  | ||||||
|     ctx: Context, |  | ||||||
|     break_ipc_after: bool | int = False, |  | ||||||
| 
 |  | ||||||
| ) -> None: |  | ||||||
|     ''' |  | ||||||
|     Receive stream msgs and spawn some IPC killers mid-stream. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     await ctx.started() |  | ||||||
|     async with ( |  | ||||||
|         ctx.open_stream() as stream, |  | ||||||
|         trio.open_nursery() as n, |  | ||||||
|     ): |  | ||||||
|         async for i in stream: |  | ||||||
|             print(f'child echoing {i}') |  | ||||||
|             await stream.send(i) |  | ||||||
|             if ( |  | ||||||
|                 break_ipc_after |  | ||||||
|                 and i > break_ipc_after |  | ||||||
|             ): |  | ||||||
|                 '#################################\n' |  | ||||||
|                 'Simulating child-side IPC BREAK!\n' |  | ||||||
|                 '#################################' |  | ||||||
|                 n.start_soon(break_channel_silently_then_error, stream) |  | ||||||
|                 n.start_soon(close_stream_and_error, stream) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def main( |  | ||||||
|     debug_mode: bool = False, |  | ||||||
|     start_method: str = 'trio', |  | ||||||
| 
 |  | ||||||
|     # by default we break the parent IPC first (if configured to break |  | ||||||
|     # at all), but this can be changed so the child does first (even if |  | ||||||
|     # both are set to break). |  | ||||||
|     break_parent_ipc_after: int | bool = False, |  | ||||||
|     break_child_ipc_after: int | bool = False, |  | ||||||
| 
 |  | ||||||
| ) -> None: |  | ||||||
| 
 |  | ||||||
|     async with ( |  | ||||||
|         open_nursery( |  | ||||||
|             start_method=start_method, |  | ||||||
| 
 |  | ||||||
|             # NOTE: even debugger is used we shouldn't get |  | ||||||
|             # a hang since it never engages due to broken IPC |  | ||||||
|             debug_mode=debug_mode, |  | ||||||
|             loglevel='warning', |  | ||||||
| 
 |  | ||||||
|         ) as an, |  | ||||||
|     ): |  | ||||||
|         portal = await an.start_actor( |  | ||||||
|             'chitty_hijo', |  | ||||||
|             enable_modules=[__name__], |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         async with portal.open_context( |  | ||||||
|             recv_and_spawn_net_killers, |  | ||||||
|             break_ipc_after=break_child_ipc_after, |  | ||||||
| 
 |  | ||||||
|         ) as (ctx, sent): |  | ||||||
|             async with ctx.open_stream() as stream: |  | ||||||
|                 for i in range(1000): |  | ||||||
| 
 |  | ||||||
|                     if ( |  | ||||||
|                         break_parent_ipc_after |  | ||||||
|                         and i > break_parent_ipc_after |  | ||||||
|                     ): |  | ||||||
|                         print( |  | ||||||
|                             '#################################\n' |  | ||||||
|                             'Simulating parent-side IPC BREAK!\n' |  | ||||||
|                             '#################################' |  | ||||||
|                         ) |  | ||||||
|                         await stream._ctx.chan.send(None) |  | ||||||
| 
 |  | ||||||
|                     # it actually breaks right here in the |  | ||||||
|                     # mp_spawn/forkserver backends and thus the zombie |  | ||||||
|                     # reaper never even kicks in? |  | ||||||
|                     print(f'parent sending {i}') |  | ||||||
|                     await stream.send(i) |  | ||||||
| 
 |  | ||||||
|                     with trio.move_on_after(2) as cs: |  | ||||||
| 
 |  | ||||||
|                         # NOTE: in the parent side IPC failure case this |  | ||||||
|                         # will raise an ``EndOfChannel`` after the child |  | ||||||
|                         # is killed and sends a stop msg back to it's |  | ||||||
|                         # caller/this-parent. |  | ||||||
|                         rx = await stream.receive() |  | ||||||
| 
 |  | ||||||
|                         print(f"I'm a happy user and echoed to me is {rx}") |  | ||||||
| 
 |  | ||||||
|                     if cs.cancelled_caught: |  | ||||||
|                         # pretend to be a user seeing no streaming action |  | ||||||
|                         # thinking it's a hang, and then hitting ctl-c.. |  | ||||||
|                         print("YOO i'm a user anddd thingz hangin..") |  | ||||||
| 
 |  | ||||||
|                 print( |  | ||||||
|                     "YOO i'm mad send side dun but thingz hangin..\n" |  | ||||||
|                     'MASHING CTlR-C Ctl-c..' |  | ||||||
|                 ) |  | ||||||
|                 raise KeyboardInterrupt |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| if __name__ == '__main__': |  | ||||||
|     trio.run(main) |  | ||||||
|  | @ -1,24 +0,0 @@ | ||||||
| import os |  | ||||||
| import sys |  | ||||||
| 
 |  | ||||||
| import trio |  | ||||||
| import tractor |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def main() -> None: |  | ||||||
|     async with tractor.open_nursery(debug_mode=True) as an: |  | ||||||
| 
 |  | ||||||
|         assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace' |  | ||||||
| 
 |  | ||||||
|         # TODO: an assert that verifies the hook has indeed been, hooked |  | ||||||
|         # XD |  | ||||||
|         assert sys.breakpointhook is not tractor._debug._set_trace |  | ||||||
| 
 |  | ||||||
|         breakpoint() |  | ||||||
| 
 |  | ||||||
|     # TODO: an assert that verifies the hook is unhooked.. |  | ||||||
|     assert sys.breakpointhook |  | ||||||
|     breakpoint() |  | ||||||
| 
 |  | ||||||
| if __name__ == '__main__': |  | ||||||
|     trio.run(main) |  | ||||||
|  | @ -1,19 +0,0 @@ | ||||||
| Rework our ``.trionics.BroadcastReceiver`` internals to avoid method |  | ||||||
| recursion and approach a design and interface closer to ``trio``'s |  | ||||||
| ``MemoryReceiveChannel``. |  | ||||||
| 
 |  | ||||||
| The details of the internal changes include: |  | ||||||
| 
 |  | ||||||
| - implementing a ``BroadcastReceiver.receive_nowait()`` and using it |  | ||||||
|   within the async ``.receive()`` thus avoiding recursion from |  | ||||||
|   ``.receive()``. |  | ||||||
| - failing over to an internal ``._receive_from_underlying()`` when the |  | ||||||
|   ``_nowait()`` call raises ``trio.WouldBlock`` |  | ||||||
| - adding ``BroadcastState.statistics()`` for debugging and testing both |  | ||||||
|   internals and by users. |  | ||||||
| - add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be |  | ||||||
|   set to avoid ``Lagged`` raising for possible use cases where a user |  | ||||||
|   wants to choose between a [cheap or nasty |  | ||||||
|   pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern) |  | ||||||
|   the the particular stream (we use this in ``piker``'s dark clearing |  | ||||||
|   engine to avoid fast feeds breaking during HFT periods). |  | ||||||
|  | @ -1,15 +0,0 @@ | ||||||
| Fixes to ensure IPC (channel) breakage doesn't result in hung actor |  | ||||||
| trees; the zombie reaping and general supervision machinery will always |  | ||||||
| clean up and terminate. |  | ||||||
| 
 |  | ||||||
| This includes not only the (mostly minor) fixes to solve these cases but |  | ||||||
| also a new extensive test suite in `test_advanced_faults.py` with an |  | ||||||
| accompanying highly configurable example module-script in |  | ||||||
| `examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we |  | ||||||
| never get hang or zombies despite operating in debug mode and attempt to |  | ||||||
| simulate all possible IPC transport failure cases for a local-host actor |  | ||||||
| tree. |  | ||||||
| 
 |  | ||||||
| Further we simplify `Context.open_stream.__aexit__()` to just call |  | ||||||
| `MsgStream.aclose()` directly more or less avoiding a pure duplicate |  | ||||||
| code path. |  | ||||||
|  | @ -1,7 +0,0 @@ | ||||||
| Drop `trio.Process.aclose()` usage, copy into our spawning code. |  | ||||||
| 
 |  | ||||||
| The details are laid out in https://github.com/goodboy/tractor/issues/330. |  | ||||||
| `trio` changed is process running quite some time ago, this just copies |  | ||||||
| out the small bit we needed (from the old `.aclose()`) for hard kills |  | ||||||
| where a soft runtime cancel request fails and our "zombie killer" |  | ||||||
| implementation kicks in. |  | ||||||
|  | @ -1,15 +0,0 @@ | ||||||
| Switch to using the fork & fix of `pdb++`, `pdbp`: |  | ||||||
| https://github.com/mdmintz/pdbp |  | ||||||
| 
 |  | ||||||
| Allows us to sidestep a variety of issues that aren't being maintained |  | ||||||
| in the upstream project thanks to the hard work of @mdmintz! |  | ||||||
| 
 |  | ||||||
| We also include some default settings adjustments as per recent |  | ||||||
| development on the fork: |  | ||||||
| 
 |  | ||||||
| - sticky mode is still turned on by default but now activates when |  | ||||||
|   a using the `ll` repl command. |  | ||||||
| - turn off line truncation by default to avoid inter-line gaps when |  | ||||||
|   resizing the terimnal during use. |  | ||||||
| - when using the backtrace cmd either by `w` or `bt`, the config |  | ||||||
|   automatically switches to non-sticky mode. |  | ||||||
|  | @ -1,7 +1,7 @@ | ||||||
| pytest | pytest | ||||||
| pytest-trio | pytest-trio | ||||||
| pytest-timeout | pytest-timeout | ||||||
| pdbp | pdbpp | ||||||
| mypy | mypy | ||||||
| trio_typing | trio_typing | ||||||
| pexpect | pexpect | ||||||
|  |  | ||||||
							
								
								
									
										13
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										13
									
								
								setup.py
								
								
								
								
							|  | @ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f: | ||||||
| setup( | setup( | ||||||
|     name="tractor", |     name="tractor", | ||||||
|     version='0.1.0a6dev0',  # alpha zone |     version='0.1.0a6dev0',  # alpha zone | ||||||
|     description='structured concurrrent `trio`-"actors"', |     description='structured concurrrent "actors"', | ||||||
|     long_description=readme, |     long_description=readme, | ||||||
|     license='AGPLv3', |     license='AGPLv3', | ||||||
|     author='Tyler Goodlet', |     author='Tyler Goodlet', | ||||||
|     maintainer='Tyler Goodlet', |     maintainer='Tyler Goodlet', | ||||||
|     maintainer_email='goodboy_foss@protonmail.com', |     maintainer_email='jgbt@protonmail.com', | ||||||
|     url='https://github.com/goodboy/tractor', |     url='https://github.com/goodboy/tractor', | ||||||
|     platforms=['linux', 'windows'], |     platforms=['linux', 'windows'], | ||||||
|     packages=[ |     packages=[ | ||||||
|  | @ -52,14 +52,16 @@ setup( | ||||||
|         # tooling |         # tooling | ||||||
|         'tricycle', |         'tricycle', | ||||||
|         'trio_typing', |         'trio_typing', | ||||||
|  | 
 | ||||||
|  |         # tooling | ||||||
|         'colorlog', |         'colorlog', | ||||||
|         'wrapt', |         'wrapt', | ||||||
| 
 | 
 | ||||||
|         # IPC serialization |         # serialization | ||||||
|         'msgspec', |         'msgspec', | ||||||
| 
 | 
 | ||||||
|         # debug mode REPL |         # debug mode REPL | ||||||
|         'pdbp', |         'pdbpp', | ||||||
| 
 | 
 | ||||||
|         # pip ref docs on these specs: |         # pip ref docs on these specs: | ||||||
|         # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples |         # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples | ||||||
|  | @ -71,9 +73,10 @@ setup( | ||||||
|         # https://github.com/pdbpp/fancycompleter/issues/37 |         # https://github.com/pdbpp/fancycompleter/issues/37 | ||||||
|         'pyreadline3 ; platform_system == "Windows"', |         'pyreadline3 ; platform_system == "Windows"', | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|     ], |     ], | ||||||
|     tests_require=['pytest'], |     tests_require=['pytest'], | ||||||
|     python_requires=">=3.10", |     python_requires=">=3.9", | ||||||
|     keywords=[ |     keywords=[ | ||||||
|         'trio', |         'trio', | ||||||
|         'async', |         'async', | ||||||
|  |  | ||||||
|  | @ -7,7 +7,6 @@ import os | ||||||
| import random | import random | ||||||
| import signal | import signal | ||||||
| import platform | import platform | ||||||
| import pathlib |  | ||||||
| import time | import time | ||||||
| import inspect | import inspect | ||||||
| from functools import partial, wraps | from functools import partial, wraps | ||||||
|  | @ -114,21 +113,14 @@ no_windows = pytest.mark.skipif( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def repodir() -> pathlib.Path: | def repodir(): | ||||||
|     ''' |     """Return the abspath to the repo directory. | ||||||
|     Return the abspath to the repo directory. |     """ | ||||||
| 
 |     dirname = os.path.dirname | ||||||
|     ''' |     dirpath = os.path.abspath( | ||||||
|     # 2 parents up to step up through tests/<repo_dir> |         dirname(dirname(os.path.realpath(__file__))) | ||||||
|     return pathlib.Path(__file__).parent.parent.absolute() |         ) | ||||||
| 
 |     return dirpath | ||||||
| 
 |  | ||||||
| def examples_dir() -> pathlib.Path: |  | ||||||
|     ''' |  | ||||||
|     Return the abspath to the examples directory as `pathlib.Path`. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     return repodir() / 'examples' |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pytest_addoption(parser): | def pytest_addoption(parser): | ||||||
|  | @ -159,7 +151,7 @@ def loglevel(request): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @pytest.fixture(scope='session') | @pytest.fixture(scope='session') | ||||||
| def spawn_backend(request) -> str: | def spawn_backend(request): | ||||||
|     return request.config.option.spawn_backend |     return request.config.option.spawn_backend | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,193 +0,0 @@ | ||||||
| ''' |  | ||||||
| Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la |  | ||||||
| cancelacion?.. |  | ||||||
| 
 |  | ||||||
| ''' |  | ||||||
| from functools import partial |  | ||||||
| 
 |  | ||||||
| import pytest |  | ||||||
| from _pytest.pathlib import import_path |  | ||||||
| import trio |  | ||||||
| import tractor |  | ||||||
| 
 |  | ||||||
| from conftest import ( |  | ||||||
|     examples_dir, |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'debug_mode', |  | ||||||
|     [False, True], |  | ||||||
|     ids=['no_debug_mode', 'debug_mode'], |  | ||||||
| ) |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'ipc_break', |  | ||||||
|     [ |  | ||||||
|         # no breaks |  | ||||||
|         { |  | ||||||
|             'break_parent_ipc_after': False, |  | ||||||
|             'break_child_ipc_after': False, |  | ||||||
|         }, |  | ||||||
| 
 |  | ||||||
|         # only parent breaks |  | ||||||
|         { |  | ||||||
|             'break_parent_ipc_after': 500, |  | ||||||
|             'break_child_ipc_after': False, |  | ||||||
|         }, |  | ||||||
| 
 |  | ||||||
|         # only child breaks |  | ||||||
|         { |  | ||||||
|             'break_parent_ipc_after': False, |  | ||||||
|             'break_child_ipc_after': 500, |  | ||||||
|         }, |  | ||||||
| 
 |  | ||||||
|         # both: break parent first |  | ||||||
|         { |  | ||||||
|             'break_parent_ipc_after': 500, |  | ||||||
|             'break_child_ipc_after': 800, |  | ||||||
|         }, |  | ||||||
|         # both: break child first |  | ||||||
|         { |  | ||||||
|             'break_parent_ipc_after': 800, |  | ||||||
|             'break_child_ipc_after': 500, |  | ||||||
|         }, |  | ||||||
| 
 |  | ||||||
|     ], |  | ||||||
|     ids=[ |  | ||||||
|         'no_break', |  | ||||||
|         'break_parent', |  | ||||||
|         'break_child', |  | ||||||
|         'break_both_parent_first', |  | ||||||
|         'break_both_child_first', |  | ||||||
|     ], |  | ||||||
| ) |  | ||||||
| def test_ipc_channel_break_during_stream( |  | ||||||
|     debug_mode: bool, |  | ||||||
|     spawn_backend: str, |  | ||||||
|     ipc_break: dict | None, |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     Ensure we can have an IPC channel break its connection during |  | ||||||
|     streaming and it's still possible for the (simulated) user to kill |  | ||||||
|     the actor tree using SIGINT. |  | ||||||
| 
 |  | ||||||
|     We also verify the type of connection error expected in the parent |  | ||||||
|     depending on which side if the IPC breaks first. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     if spawn_backend != 'trio': |  | ||||||
|         if debug_mode: |  | ||||||
|             pytest.skip('`debug_mode` only supported on `trio` spawner') |  | ||||||
| 
 |  | ||||||
|         # non-`trio` spawners should never hit the hang condition that |  | ||||||
|         # requires the user to do ctl-c to cancel the actor tree. |  | ||||||
|         expect_final_exc = trio.ClosedResourceError |  | ||||||
| 
 |  | ||||||
|     mod = import_path( |  | ||||||
|         examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', |  | ||||||
|         root=examples_dir(), |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
|     expect_final_exc = KeyboardInterrupt |  | ||||||
| 
 |  | ||||||
|     # when ONLY the child breaks we expect the parent to get a closed |  | ||||||
|     # resource error on the next `MsgStream.receive()` and then fail out |  | ||||||
|     # and cancel the child from there. |  | ||||||
|     if ( |  | ||||||
| 
 |  | ||||||
|         # only child breaks |  | ||||||
|         ( |  | ||||||
|             ipc_break['break_child_ipc_after'] |  | ||||||
|             and ipc_break['break_parent_ipc_after'] is False |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # both break but, parent breaks first |  | ||||||
|         or ( |  | ||||||
|             ipc_break['break_child_ipc_after'] is not False |  | ||||||
|             and ( |  | ||||||
|                 ipc_break['break_parent_ipc_after'] |  | ||||||
|                 > ipc_break['break_child_ipc_after'] |  | ||||||
|             ) |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|     ): |  | ||||||
|         expect_final_exc = trio.ClosedResourceError |  | ||||||
| 
 |  | ||||||
|     # when the parent IPC side dies (even if the child's does as well |  | ||||||
|     # but the child fails BEFORE the parent) we expect the channel to be |  | ||||||
|     # sent a stop msg from the child at some point which will signal the |  | ||||||
|     # parent that the stream has been terminated. |  | ||||||
|     # NOTE: when the parent breaks "after" the child you get this same |  | ||||||
|     # case as well, the child breaks the IPC channel with a stop msg |  | ||||||
|     # before any closure takes place. |  | ||||||
|     elif ( |  | ||||||
|         # only parent breaks |  | ||||||
|         ( |  | ||||||
|             ipc_break['break_parent_ipc_after'] |  | ||||||
|             and ipc_break['break_child_ipc_after'] is False |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # both break but, child breaks first |  | ||||||
|         or ( |  | ||||||
|             ipc_break['break_parent_ipc_after'] is not False |  | ||||||
|             and ( |  | ||||||
|                 ipc_break['break_child_ipc_after'] |  | ||||||
|                 > ipc_break['break_parent_ipc_after'] |  | ||||||
|             ) |  | ||||||
|         ) |  | ||||||
|     ): |  | ||||||
|         expect_final_exc = trio.EndOfChannel |  | ||||||
| 
 |  | ||||||
|     with pytest.raises(expect_final_exc): |  | ||||||
|         trio.run( |  | ||||||
|             partial( |  | ||||||
|                 mod.main, |  | ||||||
|                 debug_mode=debug_mode, |  | ||||||
|                 start_method=spawn_backend, |  | ||||||
|                 **ipc_break, |  | ||||||
|             ) |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @tractor.context |  | ||||||
| async def break_ipc_after_started( |  | ||||||
|     ctx: tractor.Context, |  | ||||||
| ) -> None: |  | ||||||
|     await ctx.started() |  | ||||||
|     async with ctx.open_stream() as stream: |  | ||||||
|         await stream.aclose() |  | ||||||
|         await trio.sleep(0.2) |  | ||||||
|         await ctx.chan.send(None) |  | ||||||
|         print('child broke IPC and terminating') |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages(): |  | ||||||
|     ''' |  | ||||||
|     Verify that is a subactor's IPC goes down just after bringing up a stream |  | ||||||
|     the parent can trigger a SIGINT and the child will be reaped out-of-IPC by |  | ||||||
|     the localhost process supervision machinery: aka "zombie lord". |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     async def main(): |  | ||||||
|         async with tractor.open_nursery() as n: |  | ||||||
|             portal = await n.start_actor( |  | ||||||
|                 'ipc_breaker', |  | ||||||
|                 enable_modules=[__name__], |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             with trio.move_on_after(1): |  | ||||||
|                 async with ( |  | ||||||
|                     portal.open_context( |  | ||||||
|                         break_ipc_after_started |  | ||||||
|                     ) as (ctx, sent), |  | ||||||
|                 ): |  | ||||||
|                     async with ctx.open_stream(): |  | ||||||
|                         await trio.sleep(0.5) |  | ||||||
| 
 |  | ||||||
|                     print('parent waiting on context') |  | ||||||
| 
 |  | ||||||
|             print('parent exited context') |  | ||||||
|             raise KeyboardInterrupt |  | ||||||
| 
 |  | ||||||
|     with pytest.raises(KeyboardInterrupt): |  | ||||||
|         trio.run(main) |  | ||||||
|  | @ -14,7 +14,7 @@ def is_win(): | ||||||
|     return platform.system() == 'Windows' |     return platform.system() == 'Windows' | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _registry: dict[str, set[tractor.MsgStream]] = { | _registry: dict[str, set[tractor.ReceiveMsgStream]] = { | ||||||
|     'even': set(), |     'even': set(), | ||||||
|     'odd': set(), |     'odd': set(), | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -14,7 +14,6 @@ import itertools | ||||||
| from os import path | from os import path | ||||||
| from typing import Optional | from typing import Optional | ||||||
| import platform | import platform | ||||||
| import pathlib |  | ||||||
| import sys | import sys | ||||||
| import time | import time | ||||||
| 
 | 
 | ||||||
|  | @ -25,10 +24,7 @@ from pexpect.exceptions import ( | ||||||
|     EOF, |     EOF, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from conftest import ( | from conftest import repodir, _ci_env | ||||||
|     examples_dir, |  | ||||||
|     _ci_env, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| # TODO: The next great debugger audit could be done by you! | # TODO: The next great debugger audit could be done by you! | ||||||
| # - recurrent entry to breakpoint() from single actor *after* and an | # - recurrent entry to breakpoint() from single actor *after* and an | ||||||
|  | @ -47,13 +43,19 @@ if platform.system() == 'Windows': | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def mk_cmd(ex_name: str) -> str: | def examples_dir(): | ||||||
|     ''' |     """Return the abspath to the examples directory. | ||||||
|     Generate a command suitable to pass to ``pexpect.spawn()``. |     """ | ||||||
|  |     return path.join(repodir(), 'examples', 'debugging/') | ||||||
| 
 | 
 | ||||||
|     ''' | 
 | ||||||
|     script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py' | def mk_cmd(ex_name: str) -> str: | ||||||
|     return ' '.join(['python', str(script_path)]) |     """Generate a command suitable to pass to ``pexpect.spawn()``. | ||||||
|  |     """ | ||||||
|  |     return ' '.join( | ||||||
|  |         ['python', | ||||||
|  |          path.join(examples_dir(), f'{ex_name}.py')] | ||||||
|  |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: was trying to this xfail style but some weird bug i see in CI | # TODO: was trying to this xfail style but some weird bug i see in CI | ||||||
|  | @ -95,7 +97,7 @@ def spawn( | ||||||
|     return _spawn |     return _spawn | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| PROMPT = r"\(Pdb\+\)" | PROMPT = r"\(Pdb\+\+\)" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def expect( | def expect( | ||||||
|  | @ -151,6 +153,18 @@ def ctlc( | ||||||
| 
 | 
 | ||||||
|     use_ctlc = request.param |     use_ctlc = request.param | ||||||
| 
 | 
 | ||||||
|  |     if ( | ||||||
|  |         sys.version_info <= (3, 10) | ||||||
|  |         and use_ctlc | ||||||
|  |     ): | ||||||
|  |         # on 3.9 it seems the REPL UX | ||||||
|  |         # is highly unreliable and frankly annoying | ||||||
|  |         # to test for. It does work from manual testing | ||||||
|  |         # but i just don't think it's wroth it to try | ||||||
|  |         # and get this working especially since we want to | ||||||
|  |         # be 3.10+ mega-asap. | ||||||
|  |         pytest.skip('Py3.9 and `pdbpp` son no bueno..') | ||||||
|  | 
 | ||||||
|     node = request.node |     node = request.node | ||||||
|     markers = node.own_markers |     markers = node.own_markers | ||||||
|     for mark in markers: |     for mark in markers: | ||||||
|  | @ -181,15 +195,13 @@ def ctlc( | ||||||
|     ids=lambda item: f'{item[0]} -> {item[1]}', |     ids=lambda item: f'{item[0]} -> {item[1]}', | ||||||
| ) | ) | ||||||
| def test_root_actor_error(spawn, user_in_out): | def test_root_actor_error(spawn, user_in_out): | ||||||
|     ''' |     """Demonstrate crash handler entering pdbpp from basic error in root actor. | ||||||
|     Demonstrate crash handler entering pdb from basic error in root actor. |     """ | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     user_input, expect_err_str = user_in_out |     user_input, expect_err_str = user_in_out | ||||||
| 
 | 
 | ||||||
|     child = spawn('root_actor_error') |     child = spawn('root_actor_error') | ||||||
| 
 | 
 | ||||||
|     # scan for the prompt |     # scan for the pdbpp prompt | ||||||
|     expect(child, PROMPT) |     expect(child, PROMPT) | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|  | @ -220,8 +232,8 @@ def test_root_actor_bp(spawn, user_in_out): | ||||||
|     user_input, expect_err_str = user_in_out |     user_input, expect_err_str = user_in_out | ||||||
|     child = spawn('root_actor_breakpoint') |     child = spawn('root_actor_breakpoint') | ||||||
| 
 | 
 | ||||||
|     # scan for the prompt |     # scan for the pdbpp prompt | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     assert 'Error' not in str(child.before) |     assert 'Error' not in str(child.before) | ||||||
| 
 | 
 | ||||||
|  | @ -262,7 +274,7 @@ def do_ctlc( | ||||||
|         if expect_prompt: |         if expect_prompt: | ||||||
|             before = str(child.before.decode()) |             before = str(child.before.decode()) | ||||||
|             time.sleep(delay) |             time.sleep(delay) | ||||||
|             child.expect(PROMPT) |             child.expect(r"\(Pdb\+\+\)") | ||||||
|             time.sleep(delay) |             time.sleep(delay) | ||||||
| 
 | 
 | ||||||
|             if patt: |             if patt: | ||||||
|  | @ -281,7 +293,7 @@ def test_root_actor_bp_forever( | ||||||
|     # entries |     # entries | ||||||
|     for _ in range(10): |     for _ in range(10): | ||||||
| 
 | 
 | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|         if ctlc: |         if ctlc: | ||||||
|             do_ctlc(child) |             do_ctlc(child) | ||||||
|  | @ -291,7 +303,7 @@ def test_root_actor_bp_forever( | ||||||
|     # do one continue which should trigger a |     # do one continue which should trigger a | ||||||
|     # new task to lock the tty |     # new task to lock the tty | ||||||
|     child.sendline('continue') |     child.sendline('continue') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # seems that if we hit ctrl-c too fast the |     # seems that if we hit ctrl-c too fast the | ||||||
|     # sigint guard machinery might not kick in.. |     # sigint guard machinery might not kick in.. | ||||||
|  | @ -302,10 +314,10 @@ def test_root_actor_bp_forever( | ||||||
| 
 | 
 | ||||||
|     # XXX: this previously caused a bug! |     # XXX: this previously caused a bug! | ||||||
|     child.sendline('n') |     child.sendline('n') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     child.sendline('n') |     child.sendline('n') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # quit out of the loop |     # quit out of the loop | ||||||
|     child.sendline('q') |     child.sendline('q') | ||||||
|  | @ -328,8 +340,8 @@ def test_subactor_error( | ||||||
|     ''' |     ''' | ||||||
|     child = spawn('subactor_error') |     child = spawn('subactor_error') | ||||||
| 
 | 
 | ||||||
|     # scan for the prompt |     # scan for the pdbpp prompt | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before |     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||||
|  | @ -349,7 +361,7 @@ def test_subactor_error( | ||||||
|         # creating actor |         # creating actor | ||||||
|         child.sendline('continue') |         child.sendline('continue') | ||||||
| 
 | 
 | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
| 
 | 
 | ||||||
|     # root actor gets debugger engaged |     # root actor gets debugger engaged | ||||||
|  | @ -376,8 +388,8 @@ def test_subactor_breakpoint( | ||||||
| 
 | 
 | ||||||
|     child = spawn('subactor_breakpoint') |     child = spawn('subactor_breakpoint') | ||||||
| 
 | 
 | ||||||
|     # scan for the prompt |     # scan for the pdbpp prompt | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before |     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||||
|  | @ -386,7 +398,7 @@ def test_subactor_breakpoint( | ||||||
|     # entries |     # entries | ||||||
|     for _ in range(10): |     for _ in range(10): | ||||||
|         child.sendline('next') |         child.sendline('next') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|         if ctlc: |         if ctlc: | ||||||
|             do_ctlc(child) |             do_ctlc(child) | ||||||
|  | @ -394,7 +406,7 @@ def test_subactor_breakpoint( | ||||||
|     # now run some "continues" to show re-entries |     # now run some "continues" to show re-entries | ||||||
|     for _ in range(5): |     for _ in range(5): | ||||||
|         child.sendline('continue') |         child.sendline('continue') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
|         before = str(child.before.decode()) |         before = str(child.before.decode()) | ||||||
|         assert "Attaching pdb to actor: ('breakpoint_forever'" in before |         assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||||
| 
 | 
 | ||||||
|  | @ -405,7 +417,7 @@ def test_subactor_breakpoint( | ||||||
|     child.sendline('q') |     child.sendline('q') | ||||||
| 
 | 
 | ||||||
|     # child process should exit but parent will capture pdb.BdbQuit |     # child process should exit but parent will capture pdb.BdbQuit | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "RemoteActorError: ('breakpoint_forever'" in before |     assert "RemoteActorError: ('breakpoint_forever'" in before | ||||||
|  | @ -437,8 +449,8 @@ def test_multi_subactors( | ||||||
|     ''' |     ''' | ||||||
|     child = spawn(r'multi_subactors') |     child = spawn(r'multi_subactors') | ||||||
| 
 | 
 | ||||||
|     # scan for the prompt |     # scan for the pdbpp prompt | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before |     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||||
|  | @ -450,7 +462,7 @@ def test_multi_subactors( | ||||||
|     # entries |     # entries | ||||||
|     for _ in range(10): |     for _ in range(10): | ||||||
|         child.sendline('next') |         child.sendline('next') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|         if ctlc: |         if ctlc: | ||||||
|             do_ctlc(child) |             do_ctlc(child) | ||||||
|  | @ -459,7 +471,7 @@ def test_multi_subactors( | ||||||
|     child.sendline('c') |     child.sendline('c') | ||||||
| 
 | 
 | ||||||
|     # first name_error failure |     # first name_error failure | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before |     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||||
|     assert "NameError" in before |     assert "NameError" in before | ||||||
|  | @ -471,7 +483,7 @@ def test_multi_subactors( | ||||||
|     child.sendline('c') |     child.sendline('c') | ||||||
| 
 | 
 | ||||||
|     # 2nd name_error failure |     # 2nd name_error failure | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # TODO: will we ever get the race where this crash will show up? |     # TODO: will we ever get the race where this crash will show up? | ||||||
|     # blocklist strat now prevents this crash |     # blocklist strat now prevents this crash | ||||||
|  | @ -485,7 +497,7 @@ def test_multi_subactors( | ||||||
| 
 | 
 | ||||||
|     # breakpoint loop should re-engage |     # breakpoint loop should re-engage | ||||||
|     child.sendline('c') |     child.sendline('c') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before |     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||||
| 
 | 
 | ||||||
|  | @ -501,7 +513,7 @@ def test_multi_subactors( | ||||||
|     ): |     ): | ||||||
|         child.sendline('c') |         child.sendline('c') | ||||||
|         time.sleep(0.1) |         time.sleep(0.1) | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
|         before = str(child.before.decode()) |         before = str(child.before.decode()) | ||||||
| 
 | 
 | ||||||
|         if ctlc: |         if ctlc: | ||||||
|  | @ -520,11 +532,11 @@ def test_multi_subactors( | ||||||
|     # now run some "continues" to show re-entries |     # now run some "continues" to show re-entries | ||||||
|     for _ in range(5): |     for _ in range(5): | ||||||
|         child.sendline('c') |         child.sendline('c') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # quit the loop and expect parent to attach |     # quit the loop and expect parent to attach | ||||||
|     child.sendline('q') |     child.sendline('q') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
| 
 | 
 | ||||||
|     assert_before(child, [ |     assert_before(child, [ | ||||||
|  | @ -568,7 +580,7 @@ def test_multi_daemon_subactors( | ||||||
|     ''' |     ''' | ||||||
|     child = spawn('multi_daemon_subactors') |     child = spawn('multi_daemon_subactors') | ||||||
| 
 | 
 | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # there can be a race for which subactor will acquire |     # there can be a race for which subactor will acquire | ||||||
|     # the root's tty lock first so anticipate either crash |     # the root's tty lock first so anticipate either crash | ||||||
|  | @ -598,7 +610,7 @@ def test_multi_daemon_subactors( | ||||||
|     # second entry by `bp_forever`. |     # second entry by `bp_forever`. | ||||||
| 
 | 
 | ||||||
|     child.sendline('c') |     child.sendline('c') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
|     assert_before(child, [next_msg]) |     assert_before(child, [next_msg]) | ||||||
| 
 | 
 | ||||||
|     # XXX: hooray the root clobbering the child here was fixed! |     # XXX: hooray the root clobbering the child here was fixed! | ||||||
|  | @ -620,7 +632,7 @@ def test_multi_daemon_subactors( | ||||||
| 
 | 
 | ||||||
|     # expect another breakpoint actor entry |     # expect another breakpoint actor entry | ||||||
|     child.sendline('c') |     child.sendline('c') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|         assert_before(child, [bp_forever_msg]) |         assert_before(child, [bp_forever_msg]) | ||||||
|  | @ -636,7 +648,7 @@ def test_multi_daemon_subactors( | ||||||
|         # after 1 or more further bp actor entries. |         # after 1 or more further bp actor entries. | ||||||
| 
 | 
 | ||||||
|         child.sendline('c') |         child.sendline('c') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
|         assert_before(child, [name_error_msg]) |         assert_before(child, [name_error_msg]) | ||||||
| 
 | 
 | ||||||
|     # wait for final error in root |     # wait for final error in root | ||||||
|  | @ -644,7 +656,7 @@ def test_multi_daemon_subactors( | ||||||
|     while True: |     while True: | ||||||
|         try: |         try: | ||||||
|             child.sendline('c') |             child.sendline('c') | ||||||
|             child.expect(PROMPT) |             child.expect(r"\(Pdb\+\+\)") | ||||||
|             assert_before( |             assert_before( | ||||||
|                 child, |                 child, | ||||||
|                 [bp_forever_msg] |                 [bp_forever_msg] | ||||||
|  | @ -677,8 +689,8 @@ def test_multi_subactors_root_errors( | ||||||
|     ''' |     ''' | ||||||
|     child = spawn('multi_subactor_root_errors') |     child = spawn('multi_subactor_root_errors') | ||||||
| 
 | 
 | ||||||
|     # scan for the prompt |     # scan for the pdbpp prompt | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # at most one subactor should attach before the root is cancelled |     # at most one subactor should attach before the root is cancelled | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|  | @ -693,7 +705,7 @@ def test_multi_subactors_root_errors( | ||||||
| 
 | 
 | ||||||
|     # due to block list strat from #337, this will no longer |     # due to block list strat from #337, this will no longer | ||||||
|     # propagate before the root errors and cancels the spawner sub-tree. |     # propagate before the root errors and cancels the spawner sub-tree. | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # only if the blocking condition doesn't kick in fast enough |     # only if the blocking condition doesn't kick in fast enough | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|  | @ -708,7 +720,7 @@ def test_multi_subactors_root_errors( | ||||||
|             do_ctlc(child) |             do_ctlc(child) | ||||||
| 
 | 
 | ||||||
|         child.sendline('c') |         child.sendline('c') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # check if the spawner crashed or was blocked from debug |     # check if the spawner crashed or was blocked from debug | ||||||
|     # and if this intermediary attached check the boxed error |     # and if this intermediary attached check the boxed error | ||||||
|  | @ -725,7 +737,7 @@ def test_multi_subactors_root_errors( | ||||||
|             do_ctlc(child) |             do_ctlc(child) | ||||||
| 
 | 
 | ||||||
|         child.sendline('c') |         child.sendline('c') | ||||||
|         child.expect(PROMPT) |         child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # expect a root actor crash |     # expect a root actor crash | ||||||
|     assert_before(child, [ |     assert_before(child, [ | ||||||
|  | @ -774,7 +786,7 @@ def test_multi_nested_subactors_error_through_nurseries( | ||||||
| 
 | 
 | ||||||
|     for send_char in itertools.cycle(['c', 'q']): |     for send_char in itertools.cycle(['c', 'q']): | ||||||
|         try: |         try: | ||||||
|             child.expect(PROMPT) |             child.expect(r"\(Pdb\+\+\)") | ||||||
|             child.sendline(send_char) |             child.sendline(send_char) | ||||||
|             time.sleep(0.01) |             time.sleep(0.01) | ||||||
| 
 | 
 | ||||||
|  | @ -816,7 +828,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | ||||||
| 
 | 
 | ||||||
|     child = spawn('root_cancelled_but_child_is_in_tty_lock') |     child = spawn('root_cancelled_but_child_is_in_tty_lock') | ||||||
| 
 | 
 | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "NameError: name 'doggypants' is not defined" in before |     assert "NameError: name 'doggypants' is not defined" in before | ||||||
|  | @ -831,7 +843,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | ||||||
|     for i in range(4): |     for i in range(4): | ||||||
|         time.sleep(0.5) |         time.sleep(0.5) | ||||||
|         try: |         try: | ||||||
|             child.expect(PROMPT) |             child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|         except ( |         except ( | ||||||
|             EOF, |             EOF, | ||||||
|  | @ -888,7 +900,7 @@ def test_root_cancels_child_context_during_startup( | ||||||
|     ''' |     ''' | ||||||
|     child = spawn('fast_error_in_root_after_spawn') |     child = spawn('fast_error_in_root_after_spawn') | ||||||
| 
 | 
 | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|     assert "AssertionError" in before |     assert "AssertionError" in before | ||||||
|  | @ -905,7 +917,7 @@ def test_different_debug_mode_per_actor( | ||||||
|     ctlc: bool, |     ctlc: bool, | ||||||
| ): | ): | ||||||
|     child = spawn('per_actor_debug') |     child = spawn('per_actor_debug') | ||||||
|     child.expect(PROMPT) |     child.expect(r"\(Pdb\+\+\)") | ||||||
| 
 | 
 | ||||||
|     # only one actor should enter the debugger |     # only one actor should enter the debugger | ||||||
|     before = str(child.before.decode()) |     before = str(child.before.decode()) | ||||||
|  |  | ||||||
|  | @ -12,17 +12,17 @@ import shutil | ||||||
| 
 | 
 | ||||||
| import pytest | import pytest | ||||||
| 
 | 
 | ||||||
| from conftest import ( | from conftest import repodir | ||||||
|     examples_dir, | 
 | ||||||
| ) | 
 | ||||||
|  | def examples_dir(): | ||||||
|  |     """Return the abspath to the examples directory. | ||||||
|  |     """ | ||||||
|  |     return os.path.join(repodir(), 'examples') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @pytest.fixture | @pytest.fixture | ||||||
| def run_example_in_subproc( | def run_example_in_subproc(loglevel, testdir, arb_addr): | ||||||
|     loglevel: str, |  | ||||||
|     testdir, |  | ||||||
|     arb_addr: tuple[str, int], |  | ||||||
| ): |  | ||||||
| 
 | 
 | ||||||
|     @contextmanager |     @contextmanager | ||||||
|     def run(script_code): |     def run(script_code): | ||||||
|  | @ -32,8 +32,8 @@ def run_example_in_subproc( | ||||||
|             # on windows we need to create a special __main__.py which will |             # on windows we need to create a special __main__.py which will | ||||||
|             # be executed with ``python -m <modulename>`` on windows.. |             # be executed with ``python -m <modulename>`` on windows.. | ||||||
|             shutil.copyfile( |             shutil.copyfile( | ||||||
|                 examples_dir() / '__main__.py', |                 os.path.join(examples_dir(), '__main__.py'), | ||||||
|                 str(testdir / '__main__.py'), |                 os.path.join(str(testdir), '__main__.py') | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # drop the ``if __name__ == '__main__'`` guard onwards from |             # drop the ``if __name__ == '__main__'`` guard onwards from | ||||||
|  | @ -88,7 +88,6 @@ def run_example_in_subproc( | ||||||
|         and f[0] != '_' |         and f[0] != '_' | ||||||
|         and 'debugging' not in p[0] |         and 'debugging' not in p[0] | ||||||
|         and 'integration' not in p[0] |         and 'integration' not in p[0] | ||||||
|         and 'advanced_faults' not in p[0] |  | ||||||
|     ], |     ], | ||||||
| 
 | 
 | ||||||
|     ids=lambda t: t[1], |     ids=lambda t: t[1], | ||||||
|  |  | ||||||
|  | @ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): | ||||||
| 
 | 
 | ||||||
|     results, diff = time_quad_ex |     results, diff = time_quad_ex | ||||||
|     assert results |     assert results | ||||||
|     this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3 |     this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666 | ||||||
|     assert diff < this_fast |     assert diff < this_fast | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -12,10 +12,7 @@ import pytest | ||||||
| import trio | import trio | ||||||
| from trio.lowlevel import current_task | from trio.lowlevel import current_task | ||||||
| import tractor | import tractor | ||||||
| from tractor.trionics import ( | from tractor.trionics import broadcast_receiver, Lagged | ||||||
|     broadcast_receiver, |  | ||||||
|     Lagged, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
|  | @ -40,7 +37,7 @@ async def echo_sequences( | ||||||
| 
 | 
 | ||||||
| async def ensure_sequence( | async def ensure_sequence( | ||||||
| 
 | 
 | ||||||
|     stream: tractor.MsgStream, |     stream: tractor.ReceiveMsgStream, | ||||||
|     sequence: list, |     sequence: list, | ||||||
|     delay: Optional[float] = None, |     delay: Optional[float] = None, | ||||||
| 
 | 
 | ||||||
|  | @ -214,8 +211,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower( | ||||||
|     arb_addr, |     arb_addr, | ||||||
|     start_method, |     start_method, | ||||||
| ): | ): | ||||||
|     ''' |     '''Ensure that if a faster task consuming from a stream is cancelled | ||||||
|     Ensure that if a faster task consuming from a stream is cancelled |  | ||||||
|     the slower task can continue to receive all expected values. |     the slower task can continue to receive all expected values. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  | @ -464,51 +460,3 @@ def test_first_recver_is_cancelled(): | ||||||
|                     assert value == 1 |                     assert value == 1 | ||||||
| 
 | 
 | ||||||
|     trio.run(main) |     trio.run(main) | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def test_no_raise_on_lag(): |  | ||||||
|     ''' |  | ||||||
|     Run a simple 2-task broadcast where one task is slow but configured |  | ||||||
|     so that it does not raise `Lagged` on overruns using |  | ||||||
|     `raise_on_lasg=False` and verify that the task does not raise. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     size = 100 |  | ||||||
|     tx, rx = trio.open_memory_channel(size) |  | ||||||
|     brx = broadcast_receiver(rx, size) |  | ||||||
| 
 |  | ||||||
|     async def slow(): |  | ||||||
|         async with brx.subscribe( |  | ||||||
|             raise_on_lag=False, |  | ||||||
|         ) as br: |  | ||||||
|             async for msg in br: |  | ||||||
|                 print(f'slow task got: {msg}') |  | ||||||
|                 await trio.sleep(0.1) |  | ||||||
| 
 |  | ||||||
|     async def fast(): |  | ||||||
|         async with brx.subscribe() as br: |  | ||||||
|             async for msg in br: |  | ||||||
|                 print(f'fast task got: {msg}') |  | ||||||
| 
 |  | ||||||
|     async def main(): |  | ||||||
|         async with ( |  | ||||||
|             tractor.open_root_actor( |  | ||||||
|                 # NOTE: so we see the warning msg emitted by the bcaster |  | ||||||
|                 # internals when the no raise flag is set. |  | ||||||
|                 loglevel='warning', |  | ||||||
|             ), |  | ||||||
|             trio.open_nursery() as n, |  | ||||||
|         ): |  | ||||||
|             n.start_soon(slow) |  | ||||||
|             n.start_soon(fast) |  | ||||||
| 
 |  | ||||||
|             for i in range(1000): |  | ||||||
|                 await tx.send(i) |  | ||||||
| 
 |  | ||||||
|             # simulate user nailing ctl-c after realizing |  | ||||||
|             # there's a lag in the slow task. |  | ||||||
|             await trio.sleep(1) |  | ||||||
|             raise KeyboardInterrupt |  | ||||||
| 
 |  | ||||||
|     with pytest.raises(KeyboardInterrupt): |  | ||||||
|         trio.run(main) |  | ||||||
|  |  | ||||||
|  | @ -24,6 +24,7 @@ from ._clustering import open_actor_cluster | ||||||
| from ._ipc import Channel | from ._ipc import Channel | ||||||
| from ._streaming import ( | from ._streaming import ( | ||||||
|     Context, |     Context, | ||||||
|  |     ReceiveMsgStream, | ||||||
|     MsgStream, |     MsgStream, | ||||||
|     stream, |     stream, | ||||||
|     context, |     context, | ||||||
|  | @ -44,10 +45,7 @@ from ._exceptions import ( | ||||||
|     ModuleNotExposed, |     ModuleNotExposed, | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
| ) | ) | ||||||
| from ._debug import ( | from ._debug import breakpoint, post_mortem | ||||||
|     breakpoint, |  | ||||||
|     post_mortem, |  | ||||||
| ) |  | ||||||
| from . import msg | from . import msg | ||||||
| from ._root import ( | from ._root import ( | ||||||
|     run_daemon, |     run_daemon, | ||||||
|  | @ -66,6 +64,7 @@ __all__ = [ | ||||||
|     'MsgStream', |     'MsgStream', | ||||||
|     'BaseExceptionGroup', |     'BaseExceptionGroup', | ||||||
|     'Portal', |     'Portal', | ||||||
|  |     'ReceiveMsgStream', | ||||||
|     'RemoteActorError', |     'RemoteActorError', | ||||||
|     'breakpoint', |     'breakpoint', | ||||||
|     'context', |     'context', | ||||||
|  |  | ||||||
|  | @ -37,7 +37,6 @@ from typing import ( | ||||||
| ) | ) | ||||||
| from types import FrameType | from types import FrameType | ||||||
| 
 | 
 | ||||||
| import pdbp |  | ||||||
| import tractor | import tractor | ||||||
| import trio | import trio | ||||||
| from trio_typing import TaskStatus | from trio_typing import TaskStatus | ||||||
|  | @ -54,6 +53,17 @@ from ._exceptions import ( | ||||||
| ) | ) | ||||||
| from ._ipc import Channel | from ._ipc import Channel | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | try: | ||||||
|  |     # wtf: only exported when installed in dev mode? | ||||||
|  |     import pdbpp | ||||||
|  | except ImportError: | ||||||
|  |     # pdbpp is installed in regular mode...it monkey patches stuff | ||||||
|  |     import pdb | ||||||
|  |     xpm = getattr(pdb, 'xpm', None) | ||||||
|  |     assert xpm, "pdbpp is not installed?"  # type: ignore | ||||||
|  |     pdbpp = pdb | ||||||
|  | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -144,26 +154,22 @@ class Lock: | ||||||
|             cls.repl = None |             cls.repl = None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class TractorConfig(pdbp.DefaultConfig): | class TractorConfig(pdbpp.DefaultConfig): | ||||||
|     ''' |     ''' | ||||||
|     Custom ``pdbp`` goodness :surfer: |     Custom ``pdbpp`` goodness. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     use_pygments: bool = True |     # use_pygments = True | ||||||
|     sticky_by_default: bool = False |     # sticky_by_default = True | ||||||
|     enable_hidden_frames: bool = False |     enable_hidden_frames = False | ||||||
| 
 |  | ||||||
|     # much thanks @mdmintz for the hot tip! |  | ||||||
|     # fixes line spacing issue when resizing terminal B) |  | ||||||
|     truncate_long_lines: bool = False |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class MultiActorPdb(pdbp.Pdb): | class MultiActorPdb(pdbpp.Pdb): | ||||||
|     ''' |     ''' | ||||||
|     Add teardown hooks to the regular ``pdbp.Pdb``. |     Add teardown hooks to the regular ``pdbpp.Pdb``. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     # override the pdbp config with our coolio one |     # override the pdbpp config with our coolio one | ||||||
|     DefaultConfig = TractorConfig |     DefaultConfig = TractorConfig | ||||||
| 
 | 
 | ||||||
|     # def preloop(self): |     # def preloop(self): | ||||||
|  | @ -307,7 +313,7 @@ async def lock_tty_for_child( | ||||||
| ) -> str: | ) -> str: | ||||||
|     ''' |     ''' | ||||||
|     Lock the TTY in the root process of an actor tree in a new |     Lock the TTY in the root process of an actor tree in a new | ||||||
|     inter-actor-context-task such that the ``pdbp`` debugger console |     inter-actor-context-task such that the ``pdbpp`` debugger console | ||||||
|     can be mutex-allocated to the calling sub-actor for REPL control |     can be mutex-allocated to the calling sub-actor for REPL control | ||||||
|     without interference by other processes / threads. |     without interference by other processes / threads. | ||||||
| 
 | 
 | ||||||
|  | @ -427,7 +433,7 @@ async def wait_for_parent_stdin_hijack( | ||||||
| def mk_mpdb() -> tuple[MultiActorPdb, Callable]: | def mk_mpdb() -> tuple[MultiActorPdb, Callable]: | ||||||
| 
 | 
 | ||||||
|     pdb = MultiActorPdb() |     pdb = MultiActorPdb() | ||||||
|     # signal.signal = pdbp.hideframe(signal.signal) |     # signal.signal = pdbpp.hideframe(signal.signal) | ||||||
| 
 | 
 | ||||||
|     Lock.shield_sigint() |     Lock.shield_sigint() | ||||||
| 
 | 
 | ||||||
|  | @ -577,7 +583,7 @@ async def _breakpoint( | ||||||
|     #     # frame = sys._getframe() |     #     # frame = sys._getframe() | ||||||
|     #     # last_f = frame.f_back |     #     # last_f = frame.f_back | ||||||
|     #     # last_f.f_globals['__tracebackhide__'] = True |     #     # last_f.f_globals['__tracebackhide__'] = True | ||||||
|     #     # signal.signal = pdbp.hideframe(signal.signal) |     #     # signal.signal = pdbpp.hideframe(signal.signal) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def shield_sigint_handler( | def shield_sigint_handler( | ||||||
|  | @ -737,13 +743,13 @@ def shield_sigint_handler( | ||||||
|         # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 |         # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 | ||||||
|         # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py |         # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py | ||||||
| 
 | 
 | ||||||
|         # XXX LEGACY: lol, see ``pdbpp`` issue: |         # XXX: lol, see ``pdbpp`` issue: | ||||||
|         # https://github.com/pdbpp/pdbpp/issues/496 |         # https://github.com/pdbpp/pdbpp/issues/496 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _set_trace( | def _set_trace( | ||||||
|     actor: tractor.Actor | None = None, |     actor: Optional[tractor.Actor] = None, | ||||||
|     pdb: MultiActorPdb | None = None, |     pdb: Optional[MultiActorPdb] = None, | ||||||
| ): | ): | ||||||
|     __tracebackhide__ = True |     __tracebackhide__ = True | ||||||
|     actor = actor or tractor.current_actor() |     actor = actor or tractor.current_actor() | ||||||
|  | @ -753,11 +759,7 @@ def _set_trace( | ||||||
|     if frame: |     if frame: | ||||||
|         frame = frame.f_back  # type: ignore |         frame = frame.f_back  # type: ignore | ||||||
| 
 | 
 | ||||||
|     if ( |     if frame and pdb and actor is not None: | ||||||
|         frame |  | ||||||
|         and pdb |  | ||||||
|         and actor is not None |  | ||||||
|     ): |  | ||||||
|         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") |         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||||
|         # no f!#$&* idea, but when we're in async land |         # no f!#$&* idea, but when we're in async land | ||||||
|         # we need 2x frames up? |         # we need 2x frames up? | ||||||
|  | @ -766,8 +768,7 @@ def _set_trace( | ||||||
|     else: |     else: | ||||||
|         pdb, undo_sigint = mk_mpdb() |         pdb, undo_sigint = mk_mpdb() | ||||||
| 
 | 
 | ||||||
|         # we entered the global ``breakpoint()`` built-in from sync |         # we entered the global ``breakpoint()`` built-in from sync code? | ||||||
|         # code? |  | ||||||
|         Lock.local_task_in_debug = 'sync' |         Lock.local_task_in_debug = 'sync' | ||||||
| 
 | 
 | ||||||
|     pdb.set_trace(frame=frame) |     pdb.set_trace(frame=frame) | ||||||
|  | @ -797,7 +798,7 @@ def _post_mortem( | ||||||
|     # https://github.com/pdbpp/pdbpp/issues/480 |     # https://github.com/pdbpp/pdbpp/issues/480 | ||||||
|     # TODO: help with a 3.10+ major release if/when it arrives. |     # TODO: help with a 3.10+ major release if/when it arrives. | ||||||
| 
 | 
 | ||||||
|     pdbp.xpm(Pdb=lambda: pdb) |     pdbpp.xpm(Pdb=lambda: pdb) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| post_mortem = partial( | post_mortem = partial( | ||||||
|  |  | ||||||
|  | @ -45,10 +45,7 @@ from ._exceptions import ( | ||||||
|     NoResult, |     NoResult, | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
| ) | ) | ||||||
| from ._streaming import ( | from ._streaming import Context, ReceiveMsgStream | ||||||
|     Context, |  | ||||||
|     MsgStream, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
|  | @ -104,7 +101,7 @@ class Portal: | ||||||
|         # it is expected that ``result()`` will be awaited at some |         # it is expected that ``result()`` will be awaited at some | ||||||
|         # point. |         # point. | ||||||
|         self._expect_result: Optional[Context] = None |         self._expect_result: Optional[Context] = None | ||||||
|         self._streams: set[MsgStream] = set() |         self._streams: set[ReceiveMsgStream] = set() | ||||||
|         self.actor = current_actor() |         self.actor = current_actor() | ||||||
| 
 | 
 | ||||||
|     async def _submit_for_result( |     async def _submit_for_result( | ||||||
|  | @ -319,7 +316,7 @@ class Portal: | ||||||
|         async_gen_func: Callable,  # typing: ignore |         async_gen_func: Callable,  # typing: ignore | ||||||
|         **kwargs, |         **kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> AsyncGenerator[MsgStream, None]: |     ) -> AsyncGenerator[ReceiveMsgStream, None]: | ||||||
| 
 | 
 | ||||||
|         if not inspect.isasyncgenfunction(async_gen_func): |         if not inspect.isasyncgenfunction(async_gen_func): | ||||||
|             if not ( |             if not ( | ||||||
|  | @ -344,7 +341,7 @@ class Portal: | ||||||
| 
 | 
 | ||||||
|         try: |         try: | ||||||
|             # deliver receive only stream |             # deliver receive only stream | ||||||
|             async with MsgStream( |             async with ReceiveMsgStream( | ||||||
|                 ctx, ctx._recv_chan, |                 ctx, ctx._recv_chan, | ||||||
|             ) as rchan: |             ) as rchan: | ||||||
|                 self._streams.add(rchan) |                 self._streams.add(rchan) | ||||||
|  | @ -500,10 +497,6 @@ class Portal: | ||||||
|                     f'actor: {uid}' |                     f'actor: {uid}' | ||||||
|                 ) |                 ) | ||||||
|                 result = await ctx.result() |                 result = await ctx.result() | ||||||
|                 log.runtime( |  | ||||||
|                     f'Context {fn_name} returned ' |  | ||||||
|                     f'value from callee `{result}`' |  | ||||||
|                 ) |  | ||||||
| 
 | 
 | ||||||
|             # though it should be impossible for any tasks |             # though it should be impossible for any tasks | ||||||
|             # operating *in* this scope to have survived |             # operating *in* this scope to have survived | ||||||
|  | @ -525,6 +518,12 @@ class Portal: | ||||||
|                         f'task:{cid}\n' |                         f'task:{cid}\n' | ||||||
|                         f'actor:{uid}' |                         f'actor:{uid}' | ||||||
|                     ) |                     ) | ||||||
|  |             else: | ||||||
|  |                 log.runtime( | ||||||
|  |                     f'Context {fn_name} returned ' | ||||||
|  |                     f'value from callee `{result}`' | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we |             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||||
|             # wait for any immediate child in debug before popping the |             # wait for any immediate child in debug before popping the | ||||||
|             # context from the runtime msg loop otherwise inside |             # context from the runtime msg loop otherwise inside | ||||||
|  |  | ||||||
|  | @ -22,9 +22,8 @@ from contextlib import asynccontextmanager | ||||||
| from functools import partial | from functools import partial | ||||||
| import importlib | import importlib | ||||||
| import logging | import logging | ||||||
| import signal |  | ||||||
| import sys |  | ||||||
| import os | import os | ||||||
|  | import signal | ||||||
| import typing | import typing | ||||||
| import warnings | import warnings | ||||||
| 
 | 
 | ||||||
|  | @ -85,10 +84,8 @@ async def open_root_actor( | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     # Override the global debugger hook to make it play nice with |     # Override the global debugger hook to make it play nice with | ||||||
|     # ``trio``, see much discussion in: |     # ``trio``, see: | ||||||
|     # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 |     # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 | ||||||
|     builtin_bp_handler = sys.breakpointhook |  | ||||||
|     orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) |  | ||||||
|     os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' |     os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' | ||||||
| 
 | 
 | ||||||
|     # attempt to retreive ``trio``'s sigint handler and stash it |     # attempt to retreive ``trio``'s sigint handler and stash it | ||||||
|  | @ -257,15 +254,6 @@ async def open_root_actor( | ||||||
|                 await actor.cancel() |                 await actor.cancel() | ||||||
|     finally: |     finally: | ||||||
|         _state._current_actor = None |         _state._current_actor = None | ||||||
| 
 |  | ||||||
|         # restore breakpoint hook state |  | ||||||
|         sys.breakpointhook = builtin_bp_handler |  | ||||||
|         if orig_bp_path is not None: |  | ||||||
|             os.environ['PYTHONBREAKPOINT'] = orig_bp_path |  | ||||||
|         else: |  | ||||||
|             # clear env back to having no entry |  | ||||||
|             os.environ.pop('PYTHONBREAKPOINT') |  | ||||||
| 
 |  | ||||||
|         logger.runtime("Root actor terminated") |         logger.runtime("Root actor terminated") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -301,7 +289,7 @@ def run_daemon( | ||||||
|     async def _main(): |     async def _main(): | ||||||
| 
 | 
 | ||||||
|         async with open_root_actor( |         async with open_root_actor( | ||||||
|             registry_addr=registry_addr, |             arbiter_addr=registry_addr, | ||||||
|             name=name, |             name=name, | ||||||
|             start_method=start_method, |             start_method=start_method, | ||||||
|             debug_mode=debug_mode, |             debug_mode=debug_mode, | ||||||
|  |  | ||||||
|  | @ -228,11 +228,11 @@ async def _invoke( | ||||||
| 
 | 
 | ||||||
|                 fname = func.__name__ |                 fname = func.__name__ | ||||||
|                 if ctx._cancel_called: |                 if ctx._cancel_called: | ||||||
|                     msg = f'`{fname}()` cancelled itself' |                     msg = f'{fname} cancelled itself' | ||||||
| 
 | 
 | ||||||
|                 elif cs.cancel_called: |                 elif cs.cancel_called: | ||||||
|                     msg = ( |                     msg = ( | ||||||
|                         f'`{fname}()` was remotely cancelled by its caller ' |                         f'{fname} was remotely cancelled by its caller ' | ||||||
|                         f'{ctx.chan.uid}' |                         f'{ctx.chan.uid}' | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|  | @ -319,7 +319,7 @@ async def _invoke( | ||||||
|             BrokenPipeError, |             BrokenPipeError, | ||||||
|         ): |         ): | ||||||
|             # if we can't propagate the error that's a big boo boo |             # if we can't propagate the error that's a big boo boo | ||||||
|             log.exception( |             log.error( | ||||||
|                 f"Failed to ship error to caller @ {chan.uid} !?" |                 f"Failed to ship error to caller @ {chan.uid} !?" | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  | @ -455,7 +455,7 @@ class Actor: | ||||||
|         self._mods: dict[str, ModuleType] = {} |         self._mods: dict[str, ModuleType] = {} | ||||||
|         self.loglevel = loglevel |         self.loglevel = loglevel | ||||||
| 
 | 
 | ||||||
|         self._arb_addr: tuple[str, int] | None = ( |         self._arb_addr = ( | ||||||
|             str(arbiter_addr[0]), |             str(arbiter_addr[0]), | ||||||
|             int(arbiter_addr[1]) |             int(arbiter_addr[1]) | ||||||
|         ) if arbiter_addr else None |         ) if arbiter_addr else None | ||||||
|  | @ -488,10 +488,7 @@ class Actor: | ||||||
|         self._parent_chan: Optional[Channel] = None |         self._parent_chan: Optional[Channel] = None | ||||||
|         self._forkserver_info: Optional[ |         self._forkserver_info: Optional[ | ||||||
|             tuple[Any, Any, Any, Any, Any]] = None |             tuple[Any, Any, Any, Any, Any]] = None | ||||||
|         self._actoruid2nursery: dict[ |         self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {}  # type: ignore  # noqa | ||||||
|             tuple[str, str], |  | ||||||
|             ActorNursery | None, |  | ||||||
|         ] = {}  # type: ignore  # noqa |  | ||||||
| 
 | 
 | ||||||
|     async def wait_for_peer( |     async def wait_for_peer( | ||||||
|         self, uid: tuple[str, str] |         self, uid: tuple[str, str] | ||||||
|  | @ -829,12 +826,7 @@ class Actor: | ||||||
| 
 | 
 | ||||||
|             if ctx._backpressure: |             if ctx._backpressure: | ||||||
|                 log.warning(text) |                 log.warning(text) | ||||||
|                 try: |  | ||||||
|                 await send_chan.send(msg) |                 await send_chan.send(msg) | ||||||
|                 except trio.BrokenResourceError: |  | ||||||
|                     # XXX: local consumer has closed their side |  | ||||||
|                     # so cancel the far end streaming task |  | ||||||
|                     log.warning(f"{chan} is already closed") |  | ||||||
|             else: |             else: | ||||||
|                 try: |                 try: | ||||||
|                     raise StreamOverrun(text) from None |                     raise StreamOverrun(text) from None | ||||||
|  | @ -1379,12 +1371,10 @@ async def async_main( | ||||||
|         actor.lifetime_stack.close() |         actor.lifetime_stack.close() | ||||||
| 
 | 
 | ||||||
|         # Unregister actor from the arbiter |         # Unregister actor from the arbiter | ||||||
|         if ( |         if registered_with_arbiter and ( | ||||||
|             registered_with_arbiter |                 actor._arb_addr is not None | ||||||
|             and not actor.is_arbiter |  | ||||||
|         ): |         ): | ||||||
|             failed = False |             failed = False | ||||||
|             assert isinstance(actor._arb_addr, tuple) |  | ||||||
|             with trio.move_on_after(0.5) as cs: |             with trio.move_on_after(0.5) as cs: | ||||||
|                 cs.shield = True |                 cs.shield = True | ||||||
|                 try: |                 try: | ||||||
|  |  | ||||||
|  | @ -23,12 +23,13 @@ import sys | ||||||
| import platform | import platform | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Awaitable, |  | ||||||
|     Literal, |     Literal, | ||||||
|  |     Optional, | ||||||
|     Callable, |     Callable, | ||||||
|     TypeVar, |     TypeVar, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
| ) | ) | ||||||
|  | from collections.abc import Awaitable | ||||||
| 
 | 
 | ||||||
| from exceptiongroup import BaseExceptionGroup | from exceptiongroup import BaseExceptionGroup | ||||||
| import trio | import trio | ||||||
|  | @ -59,7 +60,7 @@ if TYPE_CHECKING: | ||||||
| log = get_logger('tractor') | log = get_logger('tractor') | ||||||
| 
 | 
 | ||||||
| # placeholder for an mp start context if so using that backend | # placeholder for an mp start context if so using that backend | ||||||
| _ctx: mp.context.BaseContext | None = None | _ctx: Optional[mp.context.BaseContext] = None | ||||||
| SpawnMethodKey = Literal[ | SpawnMethodKey = Literal[ | ||||||
|     'trio',  # supported on all platforms |     'trio',  # supported on all platforms | ||||||
|     'mp_spawn', |     'mp_spawn', | ||||||
|  | @ -85,7 +86,7 @@ else: | ||||||
| def try_set_start_method( | def try_set_start_method( | ||||||
|     key: SpawnMethodKey |     key: SpawnMethodKey | ||||||
| 
 | 
 | ||||||
| ) -> mp.context.BaseContext | None: | ) -> Optional[mp.context.BaseContext]: | ||||||
|     ''' |     ''' | ||||||
|     Attempt to set the method for process starting, aka the "actor |     Attempt to set the method for process starting, aka the "actor | ||||||
|     spawning backend". |     spawning backend". | ||||||
|  | @ -199,37 +200,16 @@ async def cancel_on_completion( | ||||||
| async def do_hard_kill( | async def do_hard_kill( | ||||||
|     proc: trio.Process, |     proc: trio.Process, | ||||||
|     terminate_after: int = 3, |     terminate_after: int = 3, | ||||||
| 
 |  | ||||||
| ) -> None: | ) -> None: | ||||||
|     # NOTE: this timeout used to do nothing since we were shielding |     # NOTE: this timeout used to do nothing since we were shielding | ||||||
|     # the ``.wait()`` inside ``new_proc()`` which will pretty much |     # the ``.wait()`` inside ``new_proc()`` which will pretty much | ||||||
|     # never release until the process exits, now it acts as |     # never release until the process exits, now it acts as | ||||||
|     # a hard-kill time ultimatum. |     # a hard-kill time ultimatum. | ||||||
|     log.debug(f"Terminating {proc}") |  | ||||||
|     with trio.move_on_after(terminate_after) as cs: |     with trio.move_on_after(terminate_after) as cs: | ||||||
| 
 | 
 | ||||||
|         # NOTE: code below was copied verbatim from the now deprecated |         # NOTE: This ``__aexit__()`` shields internally. | ||||||
|         # (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc |         async with proc:  # calls ``trio.Process.aclose()`` | ||||||
|         # string: |             log.debug(f"Terminating {proc}") | ||||||
|         # |  | ||||||
|         # Close any pipes we have to the process (both input and output) |  | ||||||
|         # and wait for it to exit. If cancelled, kills the process and |  | ||||||
|         # waits for it to finish exiting before propagating the |  | ||||||
|         # cancellation. |  | ||||||
|         with trio.CancelScope(shield=True): |  | ||||||
|             if proc.stdin is not None: |  | ||||||
|                 await proc.stdin.aclose() |  | ||||||
|             if proc.stdout is not None: |  | ||||||
|                 await proc.stdout.aclose() |  | ||||||
|             if proc.stderr is not None: |  | ||||||
|                 await proc.stderr.aclose() |  | ||||||
|         try: |  | ||||||
|             await proc.wait() |  | ||||||
|         finally: |  | ||||||
|             if proc.returncode is None: |  | ||||||
|                 proc.kill() |  | ||||||
|                 with trio.CancelScope(shield=True): |  | ||||||
|                     await proc.wait() |  | ||||||
| 
 | 
 | ||||||
|     if cs.cancelled_caught: |     if cs.cancelled_caught: | ||||||
|         # XXX: should pretty much never get here unless we have |         # XXX: should pretty much never get here unless we have | ||||||
|  | @ -280,9 +260,7 @@ async def soft_wait( | ||||||
| 
 | 
 | ||||||
|             if proc.poll() is None:  # type: ignore |             if proc.poll() is None:  # type: ignore | ||||||
|                 log.warning( |                 log.warning( | ||||||
|                     'Actor still alive after cancel request:\n' |                     f'Process still alive after cancel request:\n{uid}') | ||||||
|                     f'{uid}' |  | ||||||
|                 ) |  | ||||||
| 
 | 
 | ||||||
|                 n.cancel_scope.cancel() |                 n.cancel_scope.cancel() | ||||||
|         raise |         raise | ||||||
|  | @ -375,11 +353,12 @@ async def trio_proc( | ||||||
|         spawn_cmd.append("--asyncio") |         spawn_cmd.append("--asyncio") | ||||||
| 
 | 
 | ||||||
|     cancelled_during_spawn: bool = False |     cancelled_during_spawn: bool = False | ||||||
|     proc: trio.Process | None = None |     proc: Optional[trio.Process] = None | ||||||
|     try: |     try: | ||||||
|         try: |         try: | ||||||
|             # TODO: needs ``trio_typing`` patch? |             # TODO: needs ``trio_typing`` patch? | ||||||
|             proc = await trio.lowlevel.open_process(spawn_cmd) |             proc = await trio.lowlevel.open_process(    # type: ignore | ||||||
|  |                 spawn_cmd) | ||||||
| 
 | 
 | ||||||
|             log.runtime(f"Started {proc}") |             log.runtime(f"Started {proc}") | ||||||
| 
 | 
 | ||||||
|  | @ -463,8 +442,8 @@ async def trio_proc( | ||||||
|             nursery.cancel_scope.cancel() |             nursery.cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
|         # XXX NOTE XXX: The "hard" reap since no actor zombies are |         # The "hard" reap since no actor zombies are allowed! | ||||||
|         # allowed! Do this **after** cancellation/teardown to avoid |         # XXX: do this **after** cancellation/tearfown to avoid | ||||||
|         # killing the process too early. |         # killing the process too early. | ||||||
|         if proc: |         if proc: | ||||||
|             log.cancel(f'Hard reap sequence starting for {subactor.uid}') |             log.cancel(f'Hard reap sequence starting for {subactor.uid}') | ||||||
|  |  | ||||||
|  | @ -50,13 +50,12 @@ log = get_logger(__name__) | ||||||
| # - use __slots__ on ``Context``? | # - use __slots__ on ``Context``? | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class MsgStream(trio.abc.Channel): | class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|     ''' |     ''' | ||||||
|     A bidirectional message stream for receiving logically sequenced |     A IPC message stream for receiving logically sequenced values over | ||||||
|     values over an inter-actor IPC ``Channel``. |     an inter-actor ``Channel``. This is the type returned to a local | ||||||
| 
 |     task which entered either ``Portal.open_stream_from()`` or | ||||||
|     This is the type returned to a local task which entered either |     ``Context.open_stream()``. | ||||||
|     ``Portal.open_stream_from()`` or ``Context.open_stream()``. |  | ||||||
| 
 | 
 | ||||||
|     Termination rules: |     Termination rules: | ||||||
| 
 | 
 | ||||||
|  | @ -98,9 +97,6 @@ class MsgStream(trio.abc.Channel): | ||||||
|         if self._eoc: |         if self._eoc: | ||||||
|             raise trio.EndOfChannel |             raise trio.EndOfChannel | ||||||
| 
 | 
 | ||||||
|         if self._closed: |  | ||||||
|             raise trio.ClosedResourceError('This stream was closed') |  | ||||||
| 
 |  | ||||||
|         try: |         try: | ||||||
|             msg = await self._rx_chan.receive() |             msg = await self._rx_chan.receive() | ||||||
|             return msg['yield'] |             return msg['yield'] | ||||||
|  | @ -114,9 +110,6 @@ class MsgStream(trio.abc.Channel): | ||||||
|             # - 'error' |             # - 'error' | ||||||
|             # possibly just handle msg['stop'] here! |             # possibly just handle msg['stop'] here! | ||||||
| 
 | 
 | ||||||
|             if self._closed: |  | ||||||
|                 raise trio.ClosedResourceError('This stream was closed') |  | ||||||
| 
 |  | ||||||
|             if msg.get('stop') or self._eoc: |             if msg.get('stop') or self._eoc: | ||||||
|                 log.debug(f"{self} was stopped at remote end") |                 log.debug(f"{self} was stopped at remote end") | ||||||
| 
 | 
 | ||||||
|  | @ -196,6 +189,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|         self._eoc = True |         self._eoc = True | ||||||
|  |         self._closed = True | ||||||
| 
 | 
 | ||||||
|         # NOTE: this is super subtle IPC messaging stuff: |         # NOTE: this is super subtle IPC messaging stuff: | ||||||
|         # Relay stop iteration to far end **iff** we're |         # Relay stop iteration to far end **iff** we're | ||||||
|  | @ -212,8 +206,12 @@ class MsgStream(trio.abc.Channel): | ||||||
| 
 | 
 | ||||||
|         # In the bidirectional case, `Context.open_stream()` will create |         # In the bidirectional case, `Context.open_stream()` will create | ||||||
|         # the `Actor._cids2qs` entry from a call to |         # the `Actor._cids2qs` entry from a call to | ||||||
|         # `Actor.get_context()` and will call us here to send the stop |         # `Actor.get_context()` and will send the stop message in | ||||||
|         # msg in ``__aexit__()`` on teardown. |         # ``__aexit__()`` on teardown so it **does not** need to be | ||||||
|  |         # called here. | ||||||
|  |         if not self._ctx._portal: | ||||||
|  |             # Only for 2 way streams can we can send stop from the | ||||||
|  |             # caller side. | ||||||
|             try: |             try: | ||||||
|                 # NOTE: if this call is cancelled we expect this end to |                 # NOTE: if this call is cancelled we expect this end to | ||||||
|                 # handle as though the stop was never sent (though if it |                 # handle as though the stop was never sent (though if it | ||||||
|  | @ -230,14 +228,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|                 # the underlying channel may already have been pulled |                 # the underlying channel may already have been pulled | ||||||
|                 # in which case our stop message is meaningless since |                 # in which case our stop message is meaningless since | ||||||
|                 # it can't traverse the transport. |                 # it can't traverse the transport. | ||||||
|             ctx = self._ctx |                 log.debug(f'Channel for {self} was already closed') | ||||||
|             log.warning( |  | ||||||
|                 f'Stream was already destroyed?\n' |  | ||||||
|                 f'actor: {ctx.chan.uid}\n' |  | ||||||
|                 f'ctx id: {ctx.cid}' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         self._closed = True |  | ||||||
| 
 | 
 | ||||||
|         # Do we close the local mem chan ``self._rx_chan`` ??!? |         # Do we close the local mem chan ``self._rx_chan`` ??!? | ||||||
| 
 | 
 | ||||||
|  | @ -280,8 +271,7 @@ class MsgStream(trio.abc.Channel): | ||||||
|         self, |         self, | ||||||
| 
 | 
 | ||||||
|     ) -> AsyncIterator[BroadcastReceiver]: |     ) -> AsyncIterator[BroadcastReceiver]: | ||||||
|         ''' |         '''Allocate and return a ``BroadcastReceiver`` which delegates | ||||||
|         Allocate and return a ``BroadcastReceiver`` which delegates |  | ||||||
|         to this message stream. |         to this message stream. | ||||||
| 
 | 
 | ||||||
|         This allows multiple local tasks to receive each their own copy |         This allows multiple local tasks to receive each their own copy | ||||||
|  | @ -318,15 +308,15 @@ class MsgStream(trio.abc.Channel): | ||||||
|         async with self._broadcaster.subscribe() as bstream: |         async with self._broadcaster.subscribe() as bstream: | ||||||
|             assert bstream.key != self._broadcaster.key |             assert bstream.key != self._broadcaster.key | ||||||
|             assert bstream._recv == self._broadcaster._recv |             assert bstream._recv == self._broadcaster._recv | ||||||
| 
 |  | ||||||
|             # NOTE: we patch on a `.send()` to the bcaster so that the |  | ||||||
|             # caller can still conduct 2-way streaming using this |  | ||||||
|             # ``bstream`` handle transparently as though it was the msg |  | ||||||
|             # stream instance. |  | ||||||
|             bstream.send = self.send  # type: ignore |  | ||||||
| 
 |  | ||||||
|             yield bstream |             yield bstream | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||||
|  |     ''' | ||||||
|  |     Bidirectional message stream for use within an inter-actor actor | ||||||
|  |     ``Context```. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|     async def send( |     async def send( | ||||||
|         self, |         self, | ||||||
|         data: Any |         data: Any | ||||||
|  | @ -603,23 +593,23 @@ class Context: | ||||||
|         async with MsgStream( |         async with MsgStream( | ||||||
|             ctx=self, |             ctx=self, | ||||||
|             rx_chan=ctx._recv_chan, |             rx_chan=ctx._recv_chan, | ||||||
|         ) as stream: |         ) as rchan: | ||||||
| 
 | 
 | ||||||
|             if self._portal: |             if self._portal: | ||||||
|                 self._portal._streams.add(stream) |                 self._portal._streams.add(rchan) | ||||||
| 
 | 
 | ||||||
|             try: |             try: | ||||||
|                 self._stream_opened = True |                 self._stream_opened = True | ||||||
| 
 | 
 | ||||||
|                 # XXX: do we need this? |                 # ensure we aren't cancelled before delivering | ||||||
|                 # ensure we aren't cancelled before yielding the stream |                 # the stream | ||||||
|                 # await trio.lowlevel.checkpoint() |                 # await trio.lowlevel.checkpoint() | ||||||
|                 yield stream |                 yield rchan | ||||||
| 
 | 
 | ||||||
|                 # NOTE: Make the stream "one-shot use".  On exit, signal |                 # XXX: Make the stream "one-shot use".  On exit, signal | ||||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the |                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the | ||||||
|                 # far end. |                 # far end. | ||||||
|                 await stream.aclose() |                 await self.send_stop() | ||||||
| 
 | 
 | ||||||
|             finally: |             finally: | ||||||
|                 if self._portal: |                 if self._portal: | ||||||
|  |  | ||||||
|  | @ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery( | ||||||
| ) -> typing.AsyncGenerator[ActorNursery, None]: | ) -> typing.AsyncGenerator[ActorNursery, None]: | ||||||
| 
 | 
 | ||||||
|     # TODO: yay or nay? |     # TODO: yay or nay? | ||||||
|     __tracebackhide__ = True |     # __tracebackhide__ = True | ||||||
| 
 | 
 | ||||||
|     # the collection of errors retreived from spawned sub-actors |     # the collection of errors retreived from spawned sub-actors | ||||||
|     errors: dict[tuple[str, str], BaseException] = {} |     errors: dict[tuple[str, str], BaseException] = {} | ||||||
|  |  | ||||||
|  | @ -23,6 +23,7 @@ from __future__ import annotations | ||||||
| from abc import abstractmethod | from abc import abstractmethod | ||||||
| from collections import deque | from collections import deque | ||||||
| from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||||
|  | from dataclasses import dataclass | ||||||
| from functools import partial | from functools import partial | ||||||
| from operator import ne | from operator import ne | ||||||
| from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol | from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol | ||||||
|  | @ -32,10 +33,7 @@ import trio | ||||||
| from trio._core._run import Task | from trio._core._run import Task | ||||||
| from trio.abc import ReceiveChannel | from trio.abc import ReceiveChannel | ||||||
| from trio.lowlevel import current_task | from trio.lowlevel import current_task | ||||||
| from msgspec import Struct |  | ||||||
| from tractor.log import get_logger |  | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) |  | ||||||
| 
 | 
 | ||||||
| # A regular invariant generic type | # A regular invariant generic type | ||||||
| T = TypeVar("T") | T = TypeVar("T") | ||||||
|  | @ -88,7 +86,8 @@ class Lagged(trio.TooSlowError): | ||||||
|     ''' |     ''' | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class BroadcastState(Struct): | @dataclass | ||||||
|  | class BroadcastState: | ||||||
|     ''' |     ''' | ||||||
|     Common state to all receivers of a broadcast. |     Common state to all receivers of a broadcast. | ||||||
| 
 | 
 | ||||||
|  | @ -111,35 +110,7 @@ class BroadcastState(Struct): | ||||||
|     eoc: bool = False |     eoc: bool = False | ||||||
| 
 | 
 | ||||||
|     # If the broadcaster was cancelled, we might as well track it |     # If the broadcaster was cancelled, we might as well track it | ||||||
|     cancelled: dict[int, Task] = {} |     cancelled: bool = False | ||||||
| 
 |  | ||||||
|     def statistics(self) -> dict[str, Any]: |  | ||||||
|         ''' |  | ||||||
|         Return broadcast receiver group "statistics" like many of |  | ||||||
|         ``trio``'s internal task-sync primitives. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         key: int | None |  | ||||||
|         ev: trio.Event | None |  | ||||||
| 
 |  | ||||||
|         subs = self.subs |  | ||||||
|         if self.recv_ready is not None: |  | ||||||
|             key, ev = self.recv_ready |  | ||||||
|         else: |  | ||||||
|             key = ev = None |  | ||||||
| 
 |  | ||||||
|         qlens: dict[int, int] = {} |  | ||||||
|         for tid, sz in subs.items(): |  | ||||||
|             qlens[tid] = sz if sz != -1 else 0 |  | ||||||
| 
 |  | ||||||
|         return { |  | ||||||
|             'open_consumers': len(subs), |  | ||||||
|             'queued_len_by_task': qlens, |  | ||||||
|             'max_buffer_size': self.maxlen, |  | ||||||
|             'tasks_waiting': ev.statistics().tasks_waiting if ev else 0, |  | ||||||
|             'tasks_cancelled': self.cancelled, |  | ||||||
|             'next_value_receiver_id': key, |  | ||||||
|         } |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class BroadcastReceiver(ReceiveChannel): | class BroadcastReceiver(ReceiveChannel): | ||||||
|  | @ -157,40 +128,23 @@ class BroadcastReceiver(ReceiveChannel): | ||||||
|         rx_chan: AsyncReceiver, |         rx_chan: AsyncReceiver, | ||||||
|         state: BroadcastState, |         state: BroadcastState, | ||||||
|         receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, |         receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, | ||||||
|         raise_on_lag: bool = True, |  | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
| 
 | 
 | ||||||
|         # register the original underlying (clone) |         # register the original underlying (clone) | ||||||
|         self.key = id(self) |         self.key = id(self) | ||||||
|         self._state = state |         self._state = state | ||||||
| 
 |  | ||||||
|         # each consumer has an int count which indicates |  | ||||||
|         # which index contains the next value that the task has not yet |  | ||||||
|         # consumed and thus should read. In the "up-to-date" case the |  | ||||||
|         # consumer task must wait for a new value from the underlying |  | ||||||
|         # receiver and we use ``-1`` as the sentinel for this state. |  | ||||||
|         state.subs[self.key] = -1 |         state.subs[self.key] = -1 | ||||||
| 
 | 
 | ||||||
|         # underlying for this receiver |         # underlying for this receiver | ||||||
|         self._rx = rx_chan |         self._rx = rx_chan | ||||||
|         self._recv = receive_afunc or rx_chan.receive |         self._recv = receive_afunc or rx_chan.receive | ||||||
|         self._closed: bool = False |         self._closed: bool = False | ||||||
|         self._raise_on_lag = raise_on_lag |  | ||||||
| 
 | 
 | ||||||
|     def receive_nowait( |     async def receive(self) -> ReceiveType: | ||||||
|         self, |  | ||||||
|         _key: int | None = None, |  | ||||||
|         _state: BroadcastState | None = None, |  | ||||||
| 
 | 
 | ||||||
|     ) -> Any: |         key = self.key | ||||||
|         ''' |         state = self._state | ||||||
|         Sync version of `.receive()` which does all the low level work |  | ||||||
|         of receiving from the underlying/wrapped receive channel. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         key = _key or self.key |  | ||||||
|         state = _state or self._state |  | ||||||
| 
 | 
 | ||||||
|         # TODO: ideally we can make some way to "lock out" the |         # TODO: ideally we can make some way to "lock out" the | ||||||
|         # underlying receive channel in some way such that if some task |         # underlying receive channel in some way such that if some task | ||||||
|  | @ -223,47 +177,32 @@ class BroadcastReceiver(ReceiveChannel): | ||||||
|                 # return this value." |                 # return this value." | ||||||
|                 # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging |                 # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging | ||||||
| 
 | 
 | ||||||
|                 mxln = state.maxlen |  | ||||||
|                 lost = seq - mxln |  | ||||||
| 
 |  | ||||||
|                 # decrement to the last value and expect |                 # decrement to the last value and expect | ||||||
|                 # consumer to either handle the ``Lagged`` and come back |                 # consumer to either handle the ``Lagged`` and come back | ||||||
|                 # or bail out on its own (thus un-subscribing) |                 # or bail out on its own (thus un-subscribing) | ||||||
|                 state.subs[key] = mxln - 1 |                 state.subs[key] = state.maxlen - 1 | ||||||
| 
 | 
 | ||||||
|                 # this task was overrun by the producer side |                 # this task was overrun by the producer side | ||||||
|                 task: Task = current_task() |                 task: Task = current_task() | ||||||
|                 msg = f'Task `{task.name}` overrun and dropped `{lost}` values' |                 raise Lagged(f'Task {task.name} was overrun') | ||||||
| 
 |  | ||||||
|                 if self._raise_on_lag: |  | ||||||
|                     raise Lagged(msg) |  | ||||||
|                 else: |  | ||||||
|                     log.warning(msg) |  | ||||||
|                     return self.receive_nowait(_key, _state) |  | ||||||
| 
 | 
 | ||||||
|             state.subs[key] -= 1 |             state.subs[key] -= 1 | ||||||
|             return value |             return value | ||||||
| 
 | 
 | ||||||
|         raise trio.WouldBlock |         # current task already has the latest value **and** is the | ||||||
| 
 |         # first task to begin waiting for a new one | ||||||
|     async def _receive_from_underlying( |         if state.recv_ready is None: | ||||||
|         self, |  | ||||||
|         key: int, |  | ||||||
|         state: BroadcastState, |  | ||||||
| 
 |  | ||||||
|     ) -> ReceiveType: |  | ||||||
| 
 | 
 | ||||||
|             if self._closed: |             if self._closed: | ||||||
|                 raise trio.ClosedResourceError |                 raise trio.ClosedResourceError | ||||||
| 
 | 
 | ||||||
|             event = trio.Event() |             event = trio.Event() | ||||||
|         assert state.recv_ready is None |  | ||||||
|             state.recv_ready = key, event |             state.recv_ready = key, event | ||||||
| 
 | 
 | ||||||
|         try: |  | ||||||
|             # if we're cancelled here it should be |             # if we're cancelled here it should be | ||||||
|             # fine to bail without affecting any other consumers |             # fine to bail without affecting any other consumers | ||||||
|             # right? |             # right? | ||||||
|  |             try: | ||||||
|                 value = await self._recv() |                 value = await self._recv() | ||||||
| 
 | 
 | ||||||
|                 # items with lower indices are "newer" |                 # items with lower indices are "newer" | ||||||
|  | @ -281,6 +220,7 @@ class BroadcastReceiver(ReceiveChannel): | ||||||
|                 # already retreived the last value |                 # already retreived the last value | ||||||
| 
 | 
 | ||||||
|                 # XXX: which of these impls is fastest? |                 # XXX: which of these impls is fastest? | ||||||
|  | 
 | ||||||
|                 # subs = state.subs.copy() |                 # subs = state.subs.copy() | ||||||
|                 # subs.pop(key) |                 # subs.pop(key) | ||||||
| 
 | 
 | ||||||
|  | @ -311,85 +251,54 @@ class BroadcastReceiver(ReceiveChannel): | ||||||
|                 # consumers will be awoken with a sequence of -1 |                 # consumers will be awoken with a sequence of -1 | ||||||
|                 # and will potentially try to rewait the underlying |                 # and will potentially try to rewait the underlying | ||||||
|                 # receiver instead of just cancelling immediately. |                 # receiver instead of just cancelling immediately. | ||||||
|             self._state.cancelled[key] = current_task() |                 self._state.cancelled = True | ||||||
|                 if event.statistics().tasks_waiting: |                 if event.statistics().tasks_waiting: | ||||||
|                     event.set() |                     event.set() | ||||||
|                 raise |                 raise | ||||||
| 
 | 
 | ||||||
|             finally: |             finally: | ||||||
|  | 
 | ||||||
|                 # Reset receiver waiter task event for next blocking condition. |                 # Reset receiver waiter task event for next blocking condition. | ||||||
|                 # this MUST be reset even if the above ``.recv()`` call |                 # this MUST be reset even if the above ``.recv()`` call | ||||||
|                 # was cancelled to avoid the next consumer from blocking on |                 # was cancelled to avoid the next consumer from blocking on | ||||||
|                 # an event that won't be set! |                 # an event that won't be set! | ||||||
|                 state.recv_ready = None |                 state.recv_ready = None | ||||||
| 
 | 
 | ||||||
|     async def receive(self) -> ReceiveType: |  | ||||||
|         key = self.key |  | ||||||
|         state = self._state |  | ||||||
| 
 |  | ||||||
|         try: |  | ||||||
|             return self.receive_nowait( |  | ||||||
|                 _key=key, |  | ||||||
|                 _state=state, |  | ||||||
|             ) |  | ||||||
|         except trio.WouldBlock: |  | ||||||
|             pass |  | ||||||
| 
 |  | ||||||
|         # current task already has the latest value **and** is the |  | ||||||
|         # first task to begin waiting for a new one so we begin blocking |  | ||||||
|         # until rescheduled with the a new value from the underlying. |  | ||||||
|         if state.recv_ready is None: |  | ||||||
|             return await self._receive_from_underlying(key, state) |  | ||||||
| 
 |  | ||||||
|         # This task is all caught up and ready to receive the latest |         # This task is all caught up and ready to receive the latest | ||||||
|         # value, so queue/schedule it to be woken on the next internal |         # value, so queue sched it on the internal event. | ||||||
|         # event. |  | ||||||
|         else: |         else: | ||||||
|             while state.recv_ready is not None: |             seq = state.subs[key] | ||||||
|                 # seq = state.subs[key] |             assert seq == -1  # sanity | ||||||
|                 # assert seq == -1  # sanity |  | ||||||
|             _, ev = state.recv_ready |             _, ev = state.recv_ready | ||||||
|             await ev.wait() |             await ev.wait() | ||||||
|                 try: |  | ||||||
|                     return self.receive_nowait( |  | ||||||
|                         _key=key, |  | ||||||
|                         _state=state, |  | ||||||
|                     ) |  | ||||||
|                 except trio.WouldBlock: |  | ||||||
|                     if self._closed: |  | ||||||
|                         raise trio.ClosedResourceError |  | ||||||
| 
 | 
 | ||||||
|                     subs = state.subs |             # NOTE: if we ever would like the behaviour where if the | ||||||
|                     if ( |             # first task to recv on the underlying is cancelled but it | ||||||
|                         len(subs) == 1 |             # still DOES trigger the ``.recv_ready``, event we'll likely need | ||||||
|                         and key in subs |             # this logic: | ||||||
|                         # or cancelled |  | ||||||
|                     ): |  | ||||||
|                         # XXX: we are the last and only user of this BR so |  | ||||||
|                         # likely it makes sense to unwind back to the |  | ||||||
|                         # underlying? |  | ||||||
|                         # import tractor |  | ||||||
|                         # await tractor.breakpoint() |  | ||||||
|                         log.warning( |  | ||||||
|                             f'Only one sub left for {self}?\n' |  | ||||||
|                             'We can probably unwind from breceiver?' |  | ||||||
|                         ) |  | ||||||
| 
 | 
 | ||||||
|  |             if seq > -1: | ||||||
|  |                 # stuff from above.. | ||||||
|  |                 seq = state.subs[key] | ||||||
|  | 
 | ||||||
|  |                 value = state.queue[seq] | ||||||
|  |                 state.subs[key] -= 1 | ||||||
|  |                 return value | ||||||
|  | 
 | ||||||
|  |             elif seq == -1: | ||||||
|                 # XXX: In the case where the first task to allocate the |                 # XXX: In the case where the first task to allocate the | ||||||
|                     # ``.recv_ready`` event is cancelled we will be woken |                 # ``.recv_ready`` event is cancelled we will be woken with | ||||||
|                     # with a non-incremented sequence number (the ``-1`` |                 # a non-incremented sequence number and thus will read the | ||||||
|                     # sentinel) and thus will read the oldest value if we |                 # oldest value if we use that. Instead we need to detect if | ||||||
|                     # use that. Instead we need to detect if we have not |                 # we have not been incremented and then receive again. | ||||||
|                     # been incremented and then receive again. |                 return await self.receive() | ||||||
|                     # return await self.receive() |  | ||||||
| 
 | 
 | ||||||
|             return await self._receive_from_underlying(key, state) |             else: | ||||||
|  |                 raise ValueError(f'Invalid sequence {seq}!?') | ||||||
| 
 | 
 | ||||||
|     @asynccontextmanager |     @asynccontextmanager | ||||||
|     async def subscribe( |     async def subscribe( | ||||||
|         self, |         self, | ||||||
|         raise_on_lag: bool = True, |  | ||||||
| 
 |  | ||||||
|     ) -> AsyncIterator[BroadcastReceiver]: |     ) -> AsyncIterator[BroadcastReceiver]: | ||||||
|         ''' |         ''' | ||||||
|         Subscribe for values from this broadcast receiver. |         Subscribe for values from this broadcast receiver. | ||||||
|  | @ -407,7 +316,6 @@ class BroadcastReceiver(ReceiveChannel): | ||||||
|             rx_chan=self._rx, |             rx_chan=self._rx, | ||||||
|             state=state, |             state=state, | ||||||
|             receive_afunc=self._recv, |             receive_afunc=self._recv, | ||||||
|             raise_on_lag=raise_on_lag, |  | ||||||
|         ) |         ) | ||||||
|         # assert clone in state.subs |         # assert clone in state.subs | ||||||
|         assert br.key in state.subs |         assert br.key in state.subs | ||||||
|  | @ -444,8 +352,7 @@ def broadcast_receiver( | ||||||
| 
 | 
 | ||||||
|     recv_chan: AsyncReceiver, |     recv_chan: AsyncReceiver, | ||||||
|     max_buffer_size: int, |     max_buffer_size: int, | ||||||
|     receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, |     **kwargs, | ||||||
|     raise_on_lag: bool = True, |  | ||||||
| 
 | 
 | ||||||
| ) -> BroadcastReceiver: | ) -> BroadcastReceiver: | ||||||
| 
 | 
 | ||||||
|  | @ -456,6 +363,5 @@ def broadcast_receiver( | ||||||
|             maxlen=max_buffer_size, |             maxlen=max_buffer_size, | ||||||
|             subs={}, |             subs={}, | ||||||
|         ), |         ), | ||||||
|         receive_afunc=receive_afunc, |         **kwargs, | ||||||
|         raise_on_lag=raise_on_lag, |  | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  | @ -133,12 +133,12 @@ async def gather_contexts( | ||||||
|         # deliver control once all managers have started up |         # deliver control once all managers have started up | ||||||
|         await all_entered.wait() |         await all_entered.wait() | ||||||
| 
 | 
 | ||||||
|         try: |         # NOTE: order *should* be preserved in the output values | ||||||
|  |         # since ``dict``s are now implicitly ordered. | ||||||
|         yield tuple(unwrapped.values()) |         yield tuple(unwrapped.values()) | ||||||
|         finally: | 
 | ||||||
|             # NOTE: this is ABSOLUTELY REQUIRED to avoid |         # we don't need a try/finally since cancellation will be triggered | ||||||
|             # the following wacky bug: |         # by the surrounding nursery on error. | ||||||
|             # <tractorbugurlhere> |  | ||||||
|         parent_exit.set() |         parent_exit.set() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue