forked from goodboy/tractor
				
			Compare commits
	
		
			63 Commits 
		
	
	
		
			prompt_on_
			...
			master
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | e5ee2e3de8 | |
|  | 41aa91c8eb | |
|  | 6758e4487c | |
|  | 1c3893a383 | |
|  | 73befac9bc | |
|  | 79622bbeea | |
|  | 95535b2226 | |
|  | 87c6e09d6b | |
|  | 9ccd3a74b6 | |
|  | ae4ff5dc8d | |
|  | 705538398f | |
|  | 86aef5238d | |
|  | cc82447db6 | |
|  | 23cffbd940 | |
|  | 3d202272c4 | |
|  | 63cdb0891f | |
|  | 0f7db27b68 | |
|  | c53d62d2f7 | |
|  | f667d16d66 | |
|  | 24a062341e | |
|  | e714bec8db | |
|  | 009cd6552e | |
|  | 649c5e7504 | |
|  | 203f95615c | |
|  | efb8bec828 | |
|  | 8637778739 | |
|  | 47166e45f0 | |
|  | 4ce2dcd12b | |
|  | 80f983818f | |
|  | 6ba29f8d56 | |
|  | 2707a0e971 | |
|  | c8efcdd0d3 | |
|  | 9f9907271b | |
|  | c2367c1c5e | |
|  | a777217674 | |
|  | 13c9eadc8f | |
|  | af6c325072 | |
|  | 195d2f0ed4 | |
|  | aa4871b13d | |
|  | 556f4626db | |
|  | 3967c0ed9e | |
|  | e34823aab4 | |
|  | 6c35ba2cb6 | |
|  | 3a0817ff55 | |
|  | 7fddb4416b | |
|  | 1d92f2552a | |
|  | 4f8586a928 | |
|  | fb9ff45745 | |
|  | 36a83cb306 | |
|  | 7394a187e0 | |
|  | df01294bb2 | |
|  | ddf3d0d1b3 | |
|  | 158569adae | |
|  | 97d5f7233b | |
|  | d27c081a15 | |
|  | a4874a3227 | |
|  | de04bbb2bb | |
|  | 4f977189c0 | |
|  | 9fd62cf71f | |
|  | 606efa5bb7 | |
|  | 121a8cc891 | |
|  | c54b8ca4ba | |
|  | de93c8257c | 
|  | @ -6,8 +6,14 @@ | |||
| ``tractor`` is a `structured concurrent`_, multi-processing_ runtime | ||||
| built on trio_. | ||||
| 
 | ||||
| Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": | ||||
| our nurseries_ let you spawn new Python processes which each run a ``trio`` | ||||
| Fundamentally, ``tractor`` gives you parallelism via | ||||
| ``trio``-"*actors*": independent Python processes (aka | ||||
| non-shared-memory threads) which maintain structured | ||||
| concurrency (SC) *end-to-end* inside a *supervision tree*. | ||||
| 
 | ||||
| Cross-process (and thus cross-host) SC is accomplished through the | ||||
| combined use of our "actor nurseries_" and an "SC-transitive IPC | ||||
| protocol" constructed on top of multiple Pythons each running a ``trio`` | ||||
| scheduled runtime - a call to ``trio.run()``. | ||||
| 
 | ||||
| We believe the system adheres to the `3 axioms`_ of an "`actor model`_" | ||||
|  | @ -23,7 +29,8 @@ Features | |||
| - **It's just** a ``trio`` API | ||||
| - *Infinitely nesteable* process trees | ||||
| - Builtin IPC streaming APIs with task fan-out broadcasting | ||||
| - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ | ||||
| - A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of | ||||
|   `pdb++`_ thanks to @mdmintz!) | ||||
| - Support for a swappable, OS specific, process spawning layer | ||||
| - A modular transport stack, allowing for custom serialization (eg. with | ||||
|   `msgspec`_), communications protocols, and environment specific IPC | ||||
|  | @ -118,7 +125,7 @@ Zombie safe: self-destruct a process tree | |||
|             f"running in pid {os.getpid()}" | ||||
|         ) | ||||
| 
 | ||||
|        await trio.sleep_forever() | ||||
|         await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
|     async def main(): | ||||
|  | @ -149,7 +156,7 @@ it **is a bug**. | |||
| 
 | ||||
| "Native" multi-process debugging | ||||
| -------------------------------- | ||||
| Using the magic of `pdb++`_ and our internal IPC, we've | ||||
| Using the magic of `pdbp`_ and our internal IPC, we've | ||||
| been able to create a native feeling debugging experience for | ||||
| any (sub-)process in your ``tractor`` tree. | ||||
| 
 | ||||
|  | @ -597,6 +604,7 @@ channel`_! | |||
| .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s | ||||
| .. _trio gitter channel: https://gitter.im/python-trio/general | ||||
| .. _matrix channel: https://matrix.to/#/!tractor:matrix.org | ||||
| .. _pdbp: https://github.com/mdmintz/pdbp | ||||
| .. _pdb++: https://github.com/pdbpp/pdbpp | ||||
| .. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops | ||||
| .. _messages: https://en.wikipedia.org/wiki/Message_passing | ||||
|  |  | |||
|  | @ -0,0 +1,151 @@ | |||
| ''' | ||||
| Complex edge case where during real-time streaming the IPC tranport | ||||
| channels are wiped out (purposely in this example though it could have | ||||
| been an outage) and we want to ensure that despite being in debug mode | ||||
| (or not) the user can sent SIGINT once they notice the hang and the | ||||
| actor tree will eventually be cancelled without leaving any zombies. | ||||
| 
 | ||||
| ''' | ||||
| import trio | ||||
| from tractor import ( | ||||
|     open_nursery, | ||||
|     context, | ||||
|     Context, | ||||
|     MsgStream, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| async def break_channel_silently_then_error( | ||||
|     stream: MsgStream, | ||||
| ): | ||||
|     async for msg in stream: | ||||
|         await stream.send(msg) | ||||
| 
 | ||||
|         # XXX: close the channel right after an error is raised | ||||
|         # purposely breaking the IPC transport to make sure the parent | ||||
|         # doesn't get stuck in debug or hang on the connection join. | ||||
|         # this more or less simulates an infinite msg-receive hang on | ||||
|         # the other end. | ||||
|         await stream._ctx.chan.send(None) | ||||
|         assert 0 | ||||
| 
 | ||||
| 
 | ||||
| async def close_stream_and_error( | ||||
|     stream: MsgStream, | ||||
| ): | ||||
|     async for msg in stream: | ||||
|         await stream.send(msg) | ||||
| 
 | ||||
|         # wipe out channel right before raising | ||||
|         await stream._ctx.chan.send(None) | ||||
|         await stream.aclose() | ||||
|         assert 0 | ||||
| 
 | ||||
| 
 | ||||
| @context | ||||
| async def recv_and_spawn_net_killers( | ||||
| 
 | ||||
|     ctx: Context, | ||||
|     break_ipc_after: bool | int = False, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Receive stream msgs and spawn some IPC killers mid-stream. | ||||
| 
 | ||||
|     ''' | ||||
|     await ctx.started() | ||||
|     async with ( | ||||
|         ctx.open_stream() as stream, | ||||
|         trio.open_nursery() as n, | ||||
|     ): | ||||
|         async for i in stream: | ||||
|             print(f'child echoing {i}') | ||||
|             await stream.send(i) | ||||
|             if ( | ||||
|                 break_ipc_after | ||||
|                 and i > break_ipc_after | ||||
|             ): | ||||
|                 '#################################\n' | ||||
|                 'Simulating child-side IPC BREAK!\n' | ||||
|                 '#################################' | ||||
|                 n.start_soon(break_channel_silently_then_error, stream) | ||||
|                 n.start_soon(close_stream_and_error, stream) | ||||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     debug_mode: bool = False, | ||||
|     start_method: str = 'trio', | ||||
| 
 | ||||
|     # by default we break the parent IPC first (if configured to break | ||||
|     # at all), but this can be changed so the child does first (even if | ||||
|     # both are set to break). | ||||
|     break_parent_ipc_after: int | bool = False, | ||||
|     break_child_ipc_after: int | bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     async with ( | ||||
|         open_nursery( | ||||
|             start_method=start_method, | ||||
| 
 | ||||
|             # NOTE: even debugger is used we shouldn't get | ||||
|             # a hang since it never engages due to broken IPC | ||||
|             debug_mode=debug_mode, | ||||
|             loglevel='warning', | ||||
| 
 | ||||
|         ) as an, | ||||
|     ): | ||||
|         portal = await an.start_actor( | ||||
|             'chitty_hijo', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             recv_and_spawn_net_killers, | ||||
|             break_ipc_after=break_child_ipc_after, | ||||
| 
 | ||||
|         ) as (ctx, sent): | ||||
|             async with ctx.open_stream() as stream: | ||||
|                 for i in range(1000): | ||||
| 
 | ||||
|                     if ( | ||||
|                         break_parent_ipc_after | ||||
|                         and i > break_parent_ipc_after | ||||
|                     ): | ||||
|                         print( | ||||
|                             '#################################\n' | ||||
|                             'Simulating parent-side IPC BREAK!\n' | ||||
|                             '#################################' | ||||
|                         ) | ||||
|                         await stream._ctx.chan.send(None) | ||||
| 
 | ||||
|                     # it actually breaks right here in the | ||||
|                     # mp_spawn/forkserver backends and thus the zombie | ||||
|                     # reaper never even kicks in? | ||||
|                     print(f'parent sending {i}') | ||||
|                     await stream.send(i) | ||||
| 
 | ||||
|                     with trio.move_on_after(2) as cs: | ||||
| 
 | ||||
|                         # NOTE: in the parent side IPC failure case this | ||||
|                         # will raise an ``EndOfChannel`` after the child | ||||
|                         # is killed and sends a stop msg back to it's | ||||
|                         # caller/this-parent. | ||||
|                         rx = await stream.receive() | ||||
| 
 | ||||
|                         print(f"I'm a happy user and echoed to me is {rx}") | ||||
| 
 | ||||
|                     if cs.cancelled_caught: | ||||
|                         # pretend to be a user seeing no streaming action | ||||
|                         # thinking it's a hang, and then hitting ctl-c.. | ||||
|                         print("YOO i'm a user anddd thingz hangin..") | ||||
| 
 | ||||
|                 print( | ||||
|                     "YOO i'm mad send side dun but thingz hangin..\n" | ||||
|                     'MASHING CTlR-C Ctl-c..' | ||||
|                 ) | ||||
|                 raise KeyboardInterrupt | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  | @ -0,0 +1,24 @@ | |||
| import os | ||||
| import sys | ||||
| 
 | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| async def main() -> None: | ||||
|     async with tractor.open_nursery(debug_mode=True) as an: | ||||
| 
 | ||||
|         assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace' | ||||
| 
 | ||||
|         # TODO: an assert that verifies the hook has indeed been, hooked | ||||
|         # XD | ||||
|         assert sys.breakpointhook is not tractor._debug._set_trace | ||||
| 
 | ||||
|         breakpoint() | ||||
| 
 | ||||
|     # TODO: an assert that verifies the hook is unhooked.. | ||||
|     assert sys.breakpointhook | ||||
|     breakpoint() | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  | @ -0,0 +1,19 @@ | |||
| Rework our ``.trionics.BroadcastReceiver`` internals to avoid method | ||||
| recursion and approach a design and interface closer to ``trio``'s | ||||
| ``MemoryReceiveChannel``. | ||||
| 
 | ||||
| The details of the internal changes include: | ||||
| 
 | ||||
| - implementing a ``BroadcastReceiver.receive_nowait()`` and using it | ||||
|   within the async ``.receive()`` thus avoiding recursion from | ||||
|   ``.receive()``. | ||||
| - failing over to an internal ``._receive_from_underlying()`` when the | ||||
|   ``_nowait()`` call raises ``trio.WouldBlock`` | ||||
| - adding ``BroadcastState.statistics()`` for debugging and testing both | ||||
|   internals and by users. | ||||
| - add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be | ||||
|   set to avoid ``Lagged`` raising for possible use cases where a user | ||||
|   wants to choose between a [cheap or nasty | ||||
|   pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern) | ||||
|   the the particular stream (we use this in ``piker``'s dark clearing | ||||
|   engine to avoid fast feeds breaking during HFT periods). | ||||
|  | @ -0,0 +1,15 @@ | |||
| Fixes to ensure IPC (channel) breakage doesn't result in hung actor | ||||
| trees; the zombie reaping and general supervision machinery will always | ||||
| clean up and terminate. | ||||
| 
 | ||||
| This includes not only the (mostly minor) fixes to solve these cases but | ||||
| also a new extensive test suite in `test_advanced_faults.py` with an | ||||
| accompanying highly configurable example module-script in | ||||
| `examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we | ||||
| never get hang or zombies despite operating in debug mode and attempt to | ||||
| simulate all possible IPC transport failure cases for a local-host actor | ||||
| tree. | ||||
| 
 | ||||
| Further we simplify `Context.open_stream.__aexit__()` to just call | ||||
| `MsgStream.aclose()` directly more or less avoiding a pure duplicate | ||||
| code path. | ||||
|  | @ -0,0 +1,7 @@ | |||
| Drop `trio.Process.aclose()` usage, copy into our spawning code. | ||||
| 
 | ||||
| The details are laid out in https://github.com/goodboy/tractor/issues/330. | ||||
| `trio` changed is process running quite some time ago, this just copies | ||||
| out the small bit we needed (from the old `.aclose()`) for hard kills | ||||
| where a soft runtime cancel request fails and our "zombie killer" | ||||
| implementation kicks in. | ||||
|  | @ -0,0 +1,15 @@ | |||
| Switch to using the fork & fix of `pdb++`, `pdbp`: | ||||
| https://github.com/mdmintz/pdbp | ||||
| 
 | ||||
| Allows us to sidestep a variety of issues that aren't being maintained | ||||
| in the upstream project thanks to the hard work of @mdmintz! | ||||
| 
 | ||||
| We also include some default settings adjustments as per recent | ||||
| development on the fork: | ||||
| 
 | ||||
| - sticky mode is still turned on by default but now activates when | ||||
|   a using the `ll` repl command. | ||||
| - turn off line truncation by default to avoid inter-line gaps when | ||||
|   resizing the terimnal during use. | ||||
| - when using the backtrace cmd either by `w` or `bt`, the config | ||||
|   automatically switches to non-sticky mode. | ||||
|  | @ -1,7 +1,7 @@ | |||
| pytest | ||||
| pytest-trio | ||||
| pytest-timeout | ||||
| pdbpp | ||||
| pdbp | ||||
| mypy | ||||
| trio_typing | ||||
| pexpect | ||||
|  |  | |||
							
								
								
									
										13
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										13
									
								
								setup.py
								
								
								
								
							|  | @ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f: | |||
| setup( | ||||
|     name="tractor", | ||||
|     version='0.1.0a6dev0',  # alpha zone | ||||
|     description='structured concurrrent "actors"', | ||||
|     description='structured concurrrent `trio`-"actors"', | ||||
|     long_description=readme, | ||||
|     license='AGPLv3', | ||||
|     author='Tyler Goodlet', | ||||
|     maintainer='Tyler Goodlet', | ||||
|     maintainer_email='jgbt@protonmail.com', | ||||
|     maintainer_email='goodboy_foss@protonmail.com', | ||||
|     url='https://github.com/goodboy/tractor', | ||||
|     platforms=['linux', 'windows'], | ||||
|     packages=[ | ||||
|  | @ -52,16 +52,14 @@ setup( | |||
|         # tooling | ||||
|         'tricycle', | ||||
|         'trio_typing', | ||||
| 
 | ||||
|         # tooling | ||||
|         'colorlog', | ||||
|         'wrapt', | ||||
| 
 | ||||
|         # serialization | ||||
|         # IPC serialization | ||||
|         'msgspec', | ||||
| 
 | ||||
|         # debug mode REPL | ||||
|         'pdbpp', | ||||
|         'pdbp', | ||||
| 
 | ||||
|         # pip ref docs on these specs: | ||||
|         # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples | ||||
|  | @ -73,10 +71,9 @@ setup( | |||
|         # https://github.com/pdbpp/fancycompleter/issues/37 | ||||
|         'pyreadline3 ; platform_system == "Windows"', | ||||
| 
 | ||||
| 
 | ||||
|     ], | ||||
|     tests_require=['pytest'], | ||||
|     python_requires=">=3.9", | ||||
|     python_requires=">=3.10", | ||||
|     keywords=[ | ||||
|         'trio', | ||||
|         'async', | ||||
|  |  | |||
|  | @ -7,6 +7,7 @@ import os | |||
| import random | ||||
| import signal | ||||
| import platform | ||||
| import pathlib | ||||
| import time | ||||
| import inspect | ||||
| from functools import partial, wraps | ||||
|  | @ -113,14 +114,21 @@ no_windows = pytest.mark.skipif( | |||
| ) | ||||
| 
 | ||||
| 
 | ||||
| def repodir(): | ||||
|     """Return the abspath to the repo directory. | ||||
|     """ | ||||
|     dirname = os.path.dirname | ||||
|     dirpath = os.path.abspath( | ||||
|         dirname(dirname(os.path.realpath(__file__))) | ||||
|         ) | ||||
|     return dirpath | ||||
| def repodir() -> pathlib.Path: | ||||
|     ''' | ||||
|     Return the abspath to the repo directory. | ||||
| 
 | ||||
|     ''' | ||||
|     # 2 parents up to step up through tests/<repo_dir> | ||||
|     return pathlib.Path(__file__).parent.parent.absolute() | ||||
| 
 | ||||
| 
 | ||||
| def examples_dir() -> pathlib.Path: | ||||
|     ''' | ||||
|     Return the abspath to the examples directory as `pathlib.Path`. | ||||
| 
 | ||||
|     ''' | ||||
|     return repodir() / 'examples' | ||||
| 
 | ||||
| 
 | ||||
| def pytest_addoption(parser): | ||||
|  | @ -151,7 +159,7 @@ def loglevel(request): | |||
| 
 | ||||
| 
 | ||||
| @pytest.fixture(scope='session') | ||||
| def spawn_backend(request): | ||||
| def spawn_backend(request) -> str: | ||||
|     return request.config.option.spawn_backend | ||||
| 
 | ||||
| 
 | ||||
|  | @ -205,16 +213,22 @@ def sig_prog(proc, sig): | |||
| 
 | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def daemon(loglevel, testdir, arb_addr): | ||||
|     """Run a daemon actor as a "remote arbiter". | ||||
|     """ | ||||
| def daemon( | ||||
|     loglevel: str, | ||||
|     testdir, | ||||
|     arb_addr: tuple[str, int], | ||||
| ): | ||||
|     ''' | ||||
|     Run a daemon actor as a "remote arbiter". | ||||
| 
 | ||||
|     ''' | ||||
|     if loglevel in ('trace', 'debug'): | ||||
|         # too much logging will lock up the subproc (smh) | ||||
|         loglevel = 'info' | ||||
| 
 | ||||
|     cmdargs = [ | ||||
|         sys.executable, '-c', | ||||
|         "import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})" | ||||
|         "import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})" | ||||
|         .format( | ||||
|             arb_addr, | ||||
|             "'{}'".format(loglevel) if loglevel else None) | ||||
|  |  | |||
|  | @ -0,0 +1,193 @@ | |||
| ''' | ||||
| Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la | ||||
| cancelacion?.. | ||||
| 
 | ||||
| ''' | ||||
| from functools import partial | ||||
| 
 | ||||
| import pytest | ||||
| from _pytest.pathlib import import_path | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| from conftest import ( | ||||
|     examples_dir, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'debug_mode', | ||||
|     [False, True], | ||||
|     ids=['no_debug_mode', 'debug_mode'], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'ipc_break', | ||||
|     [ | ||||
|         # no breaks | ||||
|         { | ||||
|             'break_parent_ipc_after': False, | ||||
|             'break_child_ipc_after': False, | ||||
|         }, | ||||
| 
 | ||||
|         # only parent breaks | ||||
|         { | ||||
|             'break_parent_ipc_after': 500, | ||||
|             'break_child_ipc_after': False, | ||||
|         }, | ||||
| 
 | ||||
|         # only child breaks | ||||
|         { | ||||
|             'break_parent_ipc_after': False, | ||||
|             'break_child_ipc_after': 500, | ||||
|         }, | ||||
| 
 | ||||
|         # both: break parent first | ||||
|         { | ||||
|             'break_parent_ipc_after': 500, | ||||
|             'break_child_ipc_after': 800, | ||||
|         }, | ||||
|         # both: break child first | ||||
|         { | ||||
|             'break_parent_ipc_after': 800, | ||||
|             'break_child_ipc_after': 500, | ||||
|         }, | ||||
| 
 | ||||
|     ], | ||||
|     ids=[ | ||||
|         'no_break', | ||||
|         'break_parent', | ||||
|         'break_child', | ||||
|         'break_both_parent_first', | ||||
|         'break_both_child_first', | ||||
|     ], | ||||
| ) | ||||
| def test_ipc_channel_break_during_stream( | ||||
|     debug_mode: bool, | ||||
|     spawn_backend: str, | ||||
|     ipc_break: dict | None, | ||||
| ): | ||||
|     ''' | ||||
|     Ensure we can have an IPC channel break its connection during | ||||
|     streaming and it's still possible for the (simulated) user to kill | ||||
|     the actor tree using SIGINT. | ||||
| 
 | ||||
|     We also verify the type of connection error expected in the parent | ||||
|     depending on which side if the IPC breaks first. | ||||
| 
 | ||||
|     ''' | ||||
|     if spawn_backend != 'trio': | ||||
|         if debug_mode: | ||||
|             pytest.skip('`debug_mode` only supported on `trio` spawner') | ||||
| 
 | ||||
|         # non-`trio` spawners should never hit the hang condition that | ||||
|         # requires the user to do ctl-c to cancel the actor tree. | ||||
|         expect_final_exc = trio.ClosedResourceError | ||||
| 
 | ||||
|     mod = import_path( | ||||
|         examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', | ||||
|         root=examples_dir(), | ||||
|     ) | ||||
| 
 | ||||
|     expect_final_exc = KeyboardInterrupt | ||||
| 
 | ||||
|     # when ONLY the child breaks we expect the parent to get a closed | ||||
|     # resource error on the next `MsgStream.receive()` and then fail out | ||||
|     # and cancel the child from there. | ||||
|     if ( | ||||
| 
 | ||||
|         # only child breaks | ||||
|         ( | ||||
|             ipc_break['break_child_ipc_after'] | ||||
|             and ipc_break['break_parent_ipc_after'] is False | ||||
|         ) | ||||
| 
 | ||||
|         # both break but, parent breaks first | ||||
|         or ( | ||||
|             ipc_break['break_child_ipc_after'] is not False | ||||
|             and ( | ||||
|                 ipc_break['break_parent_ipc_after'] | ||||
|                 > ipc_break['break_child_ipc_after'] | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|     ): | ||||
|         expect_final_exc = trio.ClosedResourceError | ||||
| 
 | ||||
|     # when the parent IPC side dies (even if the child's does as well | ||||
|     # but the child fails BEFORE the parent) we expect the channel to be | ||||
|     # sent a stop msg from the child at some point which will signal the | ||||
|     # parent that the stream has been terminated. | ||||
|     # NOTE: when the parent breaks "after" the child you get this same | ||||
|     # case as well, the child breaks the IPC channel with a stop msg | ||||
|     # before any closure takes place. | ||||
|     elif ( | ||||
|         # only parent breaks | ||||
|         ( | ||||
|             ipc_break['break_parent_ipc_after'] | ||||
|             and ipc_break['break_child_ipc_after'] is False | ||||
|         ) | ||||
| 
 | ||||
|         # both break but, child breaks first | ||||
|         or ( | ||||
|             ipc_break['break_parent_ipc_after'] is not False | ||||
|             and ( | ||||
|                 ipc_break['break_child_ipc_after'] | ||||
|                 > ipc_break['break_parent_ipc_after'] | ||||
|             ) | ||||
|         ) | ||||
|     ): | ||||
|         expect_final_exc = trio.EndOfChannel | ||||
| 
 | ||||
|     with pytest.raises(expect_final_exc): | ||||
|         trio.run( | ||||
|             partial( | ||||
|                 mod.main, | ||||
|                 debug_mode=debug_mode, | ||||
|                 start_method=spawn_backend, | ||||
|                 **ipc_break, | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def break_ipc_after_started( | ||||
|     ctx: tractor.Context, | ||||
| ) -> None: | ||||
|     await ctx.started() | ||||
|     async with ctx.open_stream() as stream: | ||||
|         await stream.aclose() | ||||
|         await trio.sleep(0.2) | ||||
|         await ctx.chan.send(None) | ||||
|         print('child broke IPC and terminating') | ||||
| 
 | ||||
| 
 | ||||
| def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages(): | ||||
|     ''' | ||||
|     Verify that is a subactor's IPC goes down just after bringing up a stream | ||||
|     the parent can trigger a SIGINT and the child will be reaped out-of-IPC by | ||||
|     the localhost process supervision machinery: aka "zombie lord". | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.start_actor( | ||||
|                 'ipc_breaker', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             with trio.move_on_after(1): | ||||
|                 async with ( | ||||
|                     portal.open_context( | ||||
|                         break_ipc_after_started | ||||
|                     ) as (ctx, sent), | ||||
|                 ): | ||||
|                     async with ctx.open_stream(): | ||||
|                         await trio.sleep(0.5) | ||||
| 
 | ||||
|                     print('parent waiting on context') | ||||
| 
 | ||||
|             print('parent exited context') | ||||
|             raise KeyboardInterrupt | ||||
| 
 | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run(main) | ||||
|  | @ -14,7 +14,7 @@ def is_win(): | |||
|     return platform.system() == 'Windows' | ||||
| 
 | ||||
| 
 | ||||
| _registry: dict[str, set[tractor.ReceiveMsgStream]] = { | ||||
| _registry: dict[str, set[tractor.MsgStream]] = { | ||||
|     'even': set(), | ||||
|     'odd': set(), | ||||
| } | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ import itertools | |||
| from os import path | ||||
| from typing import Optional | ||||
| import platform | ||||
| import pathlib | ||||
| import sys | ||||
| import time | ||||
| 
 | ||||
|  | @ -24,7 +25,10 @@ from pexpect.exceptions import ( | |||
|     EOF, | ||||
| ) | ||||
| 
 | ||||
| from conftest import repodir, _ci_env | ||||
| from conftest import ( | ||||
|     examples_dir, | ||||
|     _ci_env, | ||||
| ) | ||||
| 
 | ||||
| # TODO: The next great debugger audit could be done by you! | ||||
| # - recurrent entry to breakpoint() from single actor *after* and an | ||||
|  | @ -43,19 +47,13 @@ if platform.system() == 'Windows': | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def examples_dir(): | ||||
|     """Return the abspath to the examples directory. | ||||
|     """ | ||||
|     return path.join(repodir(), 'examples', 'debugging/') | ||||
| 
 | ||||
| 
 | ||||
| def mk_cmd(ex_name: str) -> str: | ||||
|     """Generate a command suitable to pass to ``pexpect.spawn()``. | ||||
|     """ | ||||
|     return ' '.join( | ||||
|         ['python', | ||||
|          path.join(examples_dir(), f'{ex_name}.py')] | ||||
|     ) | ||||
|     ''' | ||||
|     Generate a command suitable to pass to ``pexpect.spawn()``. | ||||
| 
 | ||||
|     ''' | ||||
|     script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py' | ||||
|     return ' '.join(['python', str(script_path)]) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: was trying to this xfail style but some weird bug i see in CI | ||||
|  | @ -97,7 +95,7 @@ def spawn( | |||
|     return _spawn | ||||
| 
 | ||||
| 
 | ||||
| PROMPT = r"\(Pdb\+\+\)" | ||||
| PROMPT = r"\(Pdb\+\)" | ||||
| 
 | ||||
| 
 | ||||
| def expect( | ||||
|  | @ -153,18 +151,6 @@ def ctlc( | |||
| 
 | ||||
|     use_ctlc = request.param | ||||
| 
 | ||||
|     if ( | ||||
|         sys.version_info <= (3, 10) | ||||
|         and use_ctlc | ||||
|     ): | ||||
|         # on 3.9 it seems the REPL UX | ||||
|         # is highly unreliable and frankly annoying | ||||
|         # to test for. It does work from manual testing | ||||
|         # but i just don't think it's wroth it to try | ||||
|         # and get this working especially since we want to | ||||
|         # be 3.10+ mega-asap. | ||||
|         pytest.skip('Py3.9 and `pdbpp` son no bueno..') | ||||
| 
 | ||||
|     node = request.node | ||||
|     markers = node.own_markers | ||||
|     for mark in markers: | ||||
|  | @ -195,13 +181,15 @@ def ctlc( | |||
|     ids=lambda item: f'{item[0]} -> {item[1]}', | ||||
| ) | ||||
| def test_root_actor_error(spawn, user_in_out): | ||||
|     """Demonstrate crash handler entering pdbpp from basic error in root actor. | ||||
|     """ | ||||
|     ''' | ||||
|     Demonstrate crash handler entering pdb from basic error in root actor. | ||||
| 
 | ||||
|     ''' | ||||
|     user_input, expect_err_str = user_in_out | ||||
| 
 | ||||
|     child = spawn('root_actor_error') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|     # scan for the prompt | ||||
|     expect(child, PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|  | @ -232,8 +220,8 @@ def test_root_actor_bp(spawn, user_in_out): | |||
|     user_input, expect_err_str = user_in_out | ||||
|     child = spawn('root_actor_breakpoint') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     # scan for the prompt | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     assert 'Error' not in str(child.before) | ||||
| 
 | ||||
|  | @ -274,7 +262,7 @@ def do_ctlc( | |||
|         if expect_prompt: | ||||
|             before = str(child.before.decode()) | ||||
|             time.sleep(delay) | ||||
|             child.expect(r"\(Pdb\+\+\)") | ||||
|             child.expect(PROMPT) | ||||
|             time.sleep(delay) | ||||
| 
 | ||||
|             if patt: | ||||
|  | @ -293,7 +281,7 @@ def test_root_actor_bp_forever( | |||
|     # entries | ||||
|     for _ in range(10): | ||||
| 
 | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
|  | @ -303,7 +291,7 @@ def test_root_actor_bp_forever( | |||
|     # do one continue which should trigger a | ||||
|     # new task to lock the tty | ||||
|     child.sendline('continue') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # seems that if we hit ctrl-c too fast the | ||||
|     # sigint guard machinery might not kick in.. | ||||
|  | @ -314,10 +302,10 @@ def test_root_actor_bp_forever( | |||
| 
 | ||||
|     # XXX: this previously caused a bug! | ||||
|     child.sendline('n') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     child.sendline('n') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # quit out of the loop | ||||
|     child.sendline('q') | ||||
|  | @ -340,8 +328,8 @@ def test_subactor_error( | |||
|     ''' | ||||
|     child = spawn('subactor_error') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     # scan for the prompt | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||
|  | @ -361,7 +349,7 @@ def test_subactor_error( | |||
|         # creating actor | ||||
|         child.sendline('continue') | ||||
| 
 | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
|     before = str(child.before.decode()) | ||||
| 
 | ||||
|     # root actor gets debugger engaged | ||||
|  | @ -388,8 +376,8 @@ def test_subactor_breakpoint( | |||
| 
 | ||||
|     child = spawn('subactor_breakpoint') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     # scan for the prompt | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
|  | @ -398,7 +386,7 @@ def test_subactor_breakpoint( | |||
|     # entries | ||||
|     for _ in range(10): | ||||
|         child.sendline('next') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
|  | @ -406,7 +394,7 @@ def test_subactor_breakpoint( | |||
|     # now run some "continues" to show re-entries | ||||
|     for _ in range(5): | ||||
|         child.sendline('continue') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
|         before = str(child.before.decode()) | ||||
|         assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
| 
 | ||||
|  | @ -417,7 +405,7 @@ def test_subactor_breakpoint( | |||
|     child.sendline('q') | ||||
| 
 | ||||
|     # child process should exit but parent will capture pdb.BdbQuit | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     assert "RemoteActorError: ('breakpoint_forever'" in before | ||||
|  | @ -449,8 +437,8 @@ def test_multi_subactors( | |||
|     ''' | ||||
|     child = spawn(r'multi_subactors') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     # scan for the prompt | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
|  | @ -462,7 +450,7 @@ def test_multi_subactors( | |||
|     # entries | ||||
|     for _ in range(10): | ||||
|         child.sendline('next') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
| 
 | ||||
|         if ctlc: | ||||
|             do_ctlc(child) | ||||
|  | @ -471,7 +459,7 @@ def test_multi_subactors( | |||
|     child.sendline('c') | ||||
| 
 | ||||
|     # first name_error failure | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching to pdb in crashed actor: ('name_error'" in before | ||||
|     assert "NameError" in before | ||||
|  | @ -483,7 +471,7 @@ def test_multi_subactors( | |||
|     child.sendline('c') | ||||
| 
 | ||||
|     # 2nd name_error failure | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # TODO: will we ever get the race where this crash will show up? | ||||
|     # blocklist strat now prevents this crash | ||||
|  | @ -497,7 +485,7 @@ def test_multi_subactors( | |||
| 
 | ||||
|     # breakpoint loop should re-engage | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
|     before = str(child.before.decode()) | ||||
|     assert "Attaching pdb to actor: ('breakpoint_forever'" in before | ||||
| 
 | ||||
|  | @ -513,7 +501,7 @@ def test_multi_subactors( | |||
|     ): | ||||
|         child.sendline('c') | ||||
|         time.sleep(0.1) | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
|         before = str(child.before.decode()) | ||||
| 
 | ||||
|         if ctlc: | ||||
|  | @ -532,11 +520,11 @@ def test_multi_subactors( | |||
|     # now run some "continues" to show re-entries | ||||
|     for _ in range(5): | ||||
|         child.sendline('c') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
| 
 | ||||
|     # quit the loop and expect parent to attach | ||||
|     child.sendline('q') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
|     before = str(child.before.decode()) | ||||
| 
 | ||||
|     assert_before(child, [ | ||||
|  | @ -580,7 +568,7 @@ def test_multi_daemon_subactors( | |||
|     ''' | ||||
|     child = spawn('multi_daemon_subactors') | ||||
| 
 | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # there can be a race for which subactor will acquire | ||||
|     # the root's tty lock first so anticipate either crash | ||||
|  | @ -610,7 +598,7 @@ def test_multi_daemon_subactors( | |||
|     # second entry by `bp_forever`. | ||||
| 
 | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
|     assert_before(child, [next_msg]) | ||||
| 
 | ||||
|     # XXX: hooray the root clobbering the child here was fixed! | ||||
|  | @ -632,7 +620,7 @@ def test_multi_daemon_subactors( | |||
| 
 | ||||
|     # expect another breakpoint actor entry | ||||
|     child.sendline('c') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     try: | ||||
|         assert_before(child, [bp_forever_msg]) | ||||
|  | @ -648,7 +636,7 @@ def test_multi_daemon_subactors( | |||
|         # after 1 or more further bp actor entries. | ||||
| 
 | ||||
|         child.sendline('c') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
|         assert_before(child, [name_error_msg]) | ||||
| 
 | ||||
|     # wait for final error in root | ||||
|  | @ -656,7 +644,7 @@ def test_multi_daemon_subactors( | |||
|     while True: | ||||
|         try: | ||||
|             child.sendline('c') | ||||
|             child.expect(r"\(Pdb\+\+\)") | ||||
|             child.expect(PROMPT) | ||||
|             assert_before( | ||||
|                 child, | ||||
|                 [bp_forever_msg] | ||||
|  | @ -689,8 +677,8 @@ def test_multi_subactors_root_errors( | |||
|     ''' | ||||
|     child = spawn('multi_subactor_root_errors') | ||||
| 
 | ||||
|     # scan for the pdbpp prompt | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     # scan for the prompt | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # at most one subactor should attach before the root is cancelled | ||||
|     before = str(child.before.decode()) | ||||
|  | @ -705,7 +693,7 @@ def test_multi_subactors_root_errors( | |||
| 
 | ||||
|     # due to block list strat from #337, this will no longer | ||||
|     # propagate before the root errors and cancels the spawner sub-tree. | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # only if the blocking condition doesn't kick in fast enough | ||||
|     before = str(child.before.decode()) | ||||
|  | @ -720,7 +708,7 @@ def test_multi_subactors_root_errors( | |||
|             do_ctlc(child) | ||||
| 
 | ||||
|         child.sendline('c') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
| 
 | ||||
|     # check if the spawner crashed or was blocked from debug | ||||
|     # and if this intermediary attached check the boxed error | ||||
|  | @ -737,7 +725,7 @@ def test_multi_subactors_root_errors( | |||
|             do_ctlc(child) | ||||
| 
 | ||||
|         child.sendline('c') | ||||
|         child.expect(r"\(Pdb\+\+\)") | ||||
|         child.expect(PROMPT) | ||||
| 
 | ||||
|     # expect a root actor crash | ||||
|     assert_before(child, [ | ||||
|  | @ -786,7 +774,7 @@ def test_multi_nested_subactors_error_through_nurseries( | |||
| 
 | ||||
|     for send_char in itertools.cycle(['c', 'q']): | ||||
|         try: | ||||
|             child.expect(r"\(Pdb\+\+\)") | ||||
|             child.expect(PROMPT) | ||||
|             child.sendline(send_char) | ||||
|             time.sleep(0.01) | ||||
| 
 | ||||
|  | @ -828,7 +816,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | |||
| 
 | ||||
|     child = spawn('root_cancelled_but_child_is_in_tty_lock') | ||||
| 
 | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     assert "NameError: name 'doggypants' is not defined" in before | ||||
|  | @ -843,7 +831,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( | |||
|     for i in range(4): | ||||
|         time.sleep(0.5) | ||||
|         try: | ||||
|             child.expect(r"\(Pdb\+\+\)") | ||||
|             child.expect(PROMPT) | ||||
| 
 | ||||
|         except ( | ||||
|             EOF, | ||||
|  | @ -900,7 +888,7 @@ def test_root_cancels_child_context_during_startup( | |||
|     ''' | ||||
|     child = spawn('fast_error_in_root_after_spawn') | ||||
| 
 | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     before = str(child.before.decode()) | ||||
|     assert "AssertionError" in before | ||||
|  | @ -917,7 +905,7 @@ def test_different_debug_mode_per_actor( | |||
|     ctlc: bool, | ||||
| ): | ||||
|     child = spawn('per_actor_debug') | ||||
|     child.expect(r"\(Pdb\+\+\)") | ||||
|     child.expect(PROMPT) | ||||
| 
 | ||||
|     # only one actor should enter the debugger | ||||
|     before = str(child.before.decode()) | ||||
|  |  | |||
|  | @ -12,17 +12,17 @@ import shutil | |||
| 
 | ||||
| import pytest | ||||
| 
 | ||||
| from conftest import repodir | ||||
| 
 | ||||
| 
 | ||||
| def examples_dir(): | ||||
|     """Return the abspath to the examples directory. | ||||
|     """ | ||||
|     return os.path.join(repodir(), 'examples') | ||||
| from conftest import ( | ||||
|     examples_dir, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def run_example_in_subproc(loglevel, testdir, arb_addr): | ||||
| def run_example_in_subproc( | ||||
|     loglevel: str, | ||||
|     testdir, | ||||
|     arb_addr: tuple[str, int], | ||||
| ): | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def run(script_code): | ||||
|  | @ -32,8 +32,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): | |||
|             # on windows we need to create a special __main__.py which will | ||||
|             # be executed with ``python -m <modulename>`` on windows.. | ||||
|             shutil.copyfile( | ||||
|                 os.path.join(examples_dir(), '__main__.py'), | ||||
|                 os.path.join(str(testdir), '__main__.py') | ||||
|                 examples_dir() / '__main__.py', | ||||
|                 str(testdir / '__main__.py'), | ||||
|             ) | ||||
| 
 | ||||
|             # drop the ``if __name__ == '__main__'`` guard onwards from | ||||
|  | @ -88,6 +88,7 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): | |||
|         and f[0] != '_' | ||||
|         and 'debugging' not in p[0] | ||||
|         and 'integration' not in p[0] | ||||
|         and 'advanced_faults' not in p[0] | ||||
|     ], | ||||
| 
 | ||||
|     ids=lambda t: t[1], | ||||
|  |  | |||
|  | @ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): | |||
| 
 | ||||
|     results, diff = time_quad_ex | ||||
|     assert results | ||||
|     this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666 | ||||
|     this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3 | ||||
|     assert diff < this_fast | ||||
| 
 | ||||
| 
 | ||||
|  | @ -12,7 +12,10 @@ import pytest | |||
| import trio | ||||
| from trio.lowlevel import current_task | ||||
| import tractor | ||||
| from tractor.trionics import broadcast_receiver, Lagged | ||||
| from tractor.trionics import ( | ||||
|     broadcast_receiver, | ||||
|     Lagged, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -37,7 +40,7 @@ async def echo_sequences( | |||
| 
 | ||||
| async def ensure_sequence( | ||||
| 
 | ||||
|     stream: tractor.ReceiveMsgStream, | ||||
|     stream: tractor.MsgStream, | ||||
|     sequence: list, | ||||
|     delay: Optional[float] = None, | ||||
| 
 | ||||
|  | @ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower( | |||
|     arb_addr, | ||||
|     start_method, | ||||
| ): | ||||
|     '''Ensure that if a faster task consuming from a stream is cancelled | ||||
|     ''' | ||||
|     Ensure that if a faster task consuming from a stream is cancelled | ||||
|     the slower task can continue to receive all expected values. | ||||
| 
 | ||||
|     ''' | ||||
|  | @ -460,3 +464,51 @@ def test_first_recver_is_cancelled(): | |||
|                     assert value == 1 | ||||
| 
 | ||||
|     trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| def test_no_raise_on_lag(): | ||||
|     ''' | ||||
|     Run a simple 2-task broadcast where one task is slow but configured | ||||
|     so that it does not raise `Lagged` on overruns using | ||||
|     `raise_on_lasg=False` and verify that the task does not raise. | ||||
| 
 | ||||
|     ''' | ||||
|     size = 100 | ||||
|     tx, rx = trio.open_memory_channel(size) | ||||
|     brx = broadcast_receiver(rx, size) | ||||
| 
 | ||||
|     async def slow(): | ||||
|         async with brx.subscribe( | ||||
|             raise_on_lag=False, | ||||
|         ) as br: | ||||
|             async for msg in br: | ||||
|                 print(f'slow task got: {msg}') | ||||
|                 await trio.sleep(0.1) | ||||
| 
 | ||||
|     async def fast(): | ||||
|         async with brx.subscribe() as br: | ||||
|             async for msg in br: | ||||
|                 print(f'fast task got: {msg}') | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with ( | ||||
|             tractor.open_root_actor( | ||||
|                 # NOTE: so we see the warning msg emitted by the bcaster | ||||
|                 # internals when the no raise flag is set. | ||||
|                 loglevel='warning', | ||||
|             ), | ||||
|             trio.open_nursery() as n, | ||||
|         ): | ||||
|             n.start_soon(slow) | ||||
|             n.start_soon(fast) | ||||
| 
 | ||||
|             for i in range(1000): | ||||
|                 await tx.send(i) | ||||
| 
 | ||||
|             # simulate user nailing ctl-c after realizing | ||||
|             # there's a lag in the slow task. | ||||
|             await trio.sleep(1) | ||||
|             raise KeyboardInterrupt | ||||
| 
 | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run(main) | ||||
|  |  | |||
|  | @ -24,7 +24,6 @@ from ._clustering import open_actor_cluster | |||
| from ._ipc import Channel | ||||
| from ._streaming import ( | ||||
|     Context, | ||||
|     ReceiveMsgStream, | ||||
|     MsgStream, | ||||
|     stream, | ||||
|     context, | ||||
|  | @ -45,7 +44,10 @@ from ._exceptions import ( | |||
|     ModuleNotExposed, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from ._debug import breakpoint, post_mortem | ||||
| from ._debug import ( | ||||
|     breakpoint, | ||||
|     post_mortem, | ||||
| ) | ||||
| from . import msg | ||||
| from ._root import ( | ||||
|     run_daemon, | ||||
|  | @ -64,7 +66,6 @@ __all__ = [ | |||
|     'MsgStream', | ||||
|     'BaseExceptionGroup', | ||||
|     'Portal', | ||||
|     'ReceiveMsgStream', | ||||
|     'RemoteActorError', | ||||
|     'breakpoint', | ||||
|     'context', | ||||
|  |  | |||
|  | @ -37,6 +37,7 @@ from typing import ( | |||
| ) | ||||
| from types import FrameType | ||||
| 
 | ||||
| import pdbp | ||||
| import tractor | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
|  | @ -53,17 +54,6 @@ from ._exceptions import ( | |||
| ) | ||||
| from ._ipc import Channel | ||||
| 
 | ||||
| 
 | ||||
| try: | ||||
|     # wtf: only exported when installed in dev mode? | ||||
|     import pdbpp | ||||
| except ImportError: | ||||
|     # pdbpp is installed in regular mode...it monkey patches stuff | ||||
|     import pdb | ||||
|     xpm = getattr(pdb, 'xpm', None) | ||||
|     assert xpm, "pdbpp is not installed?"  # type: ignore | ||||
|     pdbpp = pdb | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -154,22 +144,26 @@ class Lock: | |||
|             cls.repl = None | ||||
| 
 | ||||
| 
 | ||||
| class TractorConfig(pdbpp.DefaultConfig): | ||||
| class TractorConfig(pdbp.DefaultConfig): | ||||
|     ''' | ||||
|     Custom ``pdbpp`` goodness. | ||||
|     Custom ``pdbp`` goodness :surfer: | ||||
| 
 | ||||
|     ''' | ||||
|     # use_pygments = True | ||||
|     # sticky_by_default = True | ||||
|     enable_hidden_frames = False | ||||
|     use_pygments: bool = True | ||||
|     sticky_by_default: bool = False | ||||
|     enable_hidden_frames: bool = False | ||||
| 
 | ||||
|     # much thanks @mdmintz for the hot tip! | ||||
|     # fixes line spacing issue when resizing terminal B) | ||||
|     truncate_long_lines: bool = False | ||||
| 
 | ||||
| 
 | ||||
| class MultiActorPdb(pdbpp.Pdb): | ||||
| class MultiActorPdb(pdbp.Pdb): | ||||
|     ''' | ||||
|     Add teardown hooks to the regular ``pdbpp.Pdb``. | ||||
|     Add teardown hooks to the regular ``pdbp.Pdb``. | ||||
| 
 | ||||
|     ''' | ||||
|     # override the pdbpp config with our coolio one | ||||
|     # override the pdbp config with our coolio one | ||||
|     DefaultConfig = TractorConfig | ||||
| 
 | ||||
|     # def preloop(self): | ||||
|  | @ -313,7 +307,7 @@ async def lock_tty_for_child( | |||
| ) -> str: | ||||
|     ''' | ||||
|     Lock the TTY in the root process of an actor tree in a new | ||||
|     inter-actor-context-task such that the ``pdbpp`` debugger console | ||||
|     inter-actor-context-task such that the ``pdbp`` debugger console | ||||
|     can be mutex-allocated to the calling sub-actor for REPL control | ||||
|     without interference by other processes / threads. | ||||
| 
 | ||||
|  | @ -433,7 +427,7 @@ async def wait_for_parent_stdin_hijack( | |||
| def mk_mpdb() -> tuple[MultiActorPdb, Callable]: | ||||
| 
 | ||||
|     pdb = MultiActorPdb() | ||||
|     # signal.signal = pdbpp.hideframe(signal.signal) | ||||
|     # signal.signal = pdbp.hideframe(signal.signal) | ||||
| 
 | ||||
|     Lock.shield_sigint() | ||||
| 
 | ||||
|  | @ -583,7 +577,7 @@ async def _breakpoint( | |||
|     #     # frame = sys._getframe() | ||||
|     #     # last_f = frame.f_back | ||||
|     #     # last_f.f_globals['__tracebackhide__'] = True | ||||
|     #     # signal.signal = pdbpp.hideframe(signal.signal) | ||||
|     #     # signal.signal = pdbp.hideframe(signal.signal) | ||||
| 
 | ||||
| 
 | ||||
| def shield_sigint_handler( | ||||
|  | @ -743,13 +737,13 @@ def shield_sigint_handler( | |||
|         # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 | ||||
|         # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py | ||||
| 
 | ||||
|         # XXX: lol, see ``pdbpp`` issue: | ||||
|         # XXX LEGACY: lol, see ``pdbpp`` issue: | ||||
|         # https://github.com/pdbpp/pdbpp/issues/496 | ||||
| 
 | ||||
| 
 | ||||
| def _set_trace( | ||||
|     actor: Optional[tractor.Actor] = None, | ||||
|     pdb: Optional[MultiActorPdb] = None, | ||||
|     actor: tractor.Actor | None = None, | ||||
|     pdb: MultiActorPdb | None = None, | ||||
| ): | ||||
|     __tracebackhide__ = True | ||||
|     actor = actor or tractor.current_actor() | ||||
|  | @ -759,7 +753,11 @@ def _set_trace( | |||
|     if frame: | ||||
|         frame = frame.f_back  # type: ignore | ||||
| 
 | ||||
|     if frame and pdb and actor is not None: | ||||
|     if ( | ||||
|         frame | ||||
|         and pdb | ||||
|         and actor is not None | ||||
|     ): | ||||
|         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||
|         # no f!#$&* idea, but when we're in async land | ||||
|         # we need 2x frames up? | ||||
|  | @ -768,7 +766,8 @@ def _set_trace( | |||
|     else: | ||||
|         pdb, undo_sigint = mk_mpdb() | ||||
| 
 | ||||
|         # we entered the global ``breakpoint()`` built-in from sync code? | ||||
|         # we entered the global ``breakpoint()`` built-in from sync | ||||
|         # code? | ||||
|         Lock.local_task_in_debug = 'sync' | ||||
| 
 | ||||
|     pdb.set_trace(frame=frame) | ||||
|  | @ -798,7 +797,7 @@ def _post_mortem( | |||
|     # https://github.com/pdbpp/pdbpp/issues/480 | ||||
|     # TODO: help with a 3.10+ major release if/when it arrives. | ||||
| 
 | ||||
|     pdbpp.xpm(Pdb=lambda: pdb) | ||||
|     pdbp.xpm(Pdb=lambda: pdb) | ||||
| 
 | ||||
| 
 | ||||
| post_mortem = partial( | ||||
|  |  | |||
|  | @ -45,7 +45,10 @@ from ._exceptions import ( | |||
|     NoResult, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from ._streaming import Context, ReceiveMsgStream | ||||
| from ._streaming import ( | ||||
|     Context, | ||||
|     MsgStream, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
|  | @ -101,7 +104,7 @@ class Portal: | |||
|         # it is expected that ``result()`` will be awaited at some | ||||
|         # point. | ||||
|         self._expect_result: Optional[Context] = None | ||||
|         self._streams: set[ReceiveMsgStream] = set() | ||||
|         self._streams: set[MsgStream] = set() | ||||
|         self.actor = current_actor() | ||||
| 
 | ||||
|     async def _submit_for_result( | ||||
|  | @ -316,7 +319,7 @@ class Portal: | |||
|         async_gen_func: Callable,  # typing: ignore | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[ReceiveMsgStream, None]: | ||||
|     ) -> AsyncGenerator[MsgStream, None]: | ||||
| 
 | ||||
|         if not inspect.isasyncgenfunction(async_gen_func): | ||||
|             if not ( | ||||
|  | @ -341,7 +344,7 @@ class Portal: | |||
| 
 | ||||
|         try: | ||||
|             # deliver receive only stream | ||||
|             async with ReceiveMsgStream( | ||||
|             async with MsgStream( | ||||
|                 ctx, ctx._recv_chan, | ||||
|             ) as rchan: | ||||
|                 self._streams.add(rchan) | ||||
|  | @ -497,6 +500,10 @@ class Portal: | |||
|                     f'actor: {uid}' | ||||
|                 ) | ||||
|                 result = await ctx.result() | ||||
|                 log.runtime( | ||||
|                     f'Context {fn_name} returned ' | ||||
|                     f'value from callee `{result}`' | ||||
|                 ) | ||||
| 
 | ||||
|             # though it should be impossible for any tasks | ||||
|             # operating *in* this scope to have survived | ||||
|  | @ -518,12 +525,6 @@ class Portal: | |||
|                         f'task:{cid}\n' | ||||
|                         f'actor:{uid}' | ||||
|                     ) | ||||
|             else: | ||||
|                 log.runtime( | ||||
|                     f'Context {fn_name} returned ' | ||||
|                     f'value from callee `{result}`' | ||||
|                 ) | ||||
| 
 | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|  |  | |||
|  | @ -22,11 +22,9 @@ from contextlib import asynccontextmanager | |||
| from functools import partial | ||||
| import importlib | ||||
| import logging | ||||
| import os | ||||
| import signal | ||||
| from typing import ( | ||||
|     Optional, | ||||
| ) | ||||
| import sys | ||||
| import os | ||||
| import typing | ||||
| import warnings | ||||
| 
 | ||||
|  | @ -58,27 +56,28 @@ logger = log.get_logger('tractor') | |||
| @asynccontextmanager | ||||
| async def open_root_actor( | ||||
| 
 | ||||
|     *, | ||||
|     # defaults are above | ||||
|     arbiter_addr: Optional[tuple[str, int]] = ( | ||||
|         _default_arbiter_host, | ||||
|         _default_arbiter_port, | ||||
|     ), | ||||
|     arbiter_addr: tuple[str, int] | None = None, | ||||
| 
 | ||||
|     name: Optional[str] = 'root', | ||||
|     # defaults are above | ||||
|     registry_addr: tuple[str, int] | None = None, | ||||
| 
 | ||||
|     name: str | None = 'root', | ||||
| 
 | ||||
|     # either the `multiprocessing` start method: | ||||
|     # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods | ||||
|     # OR `trio` (the new default). | ||||
|     start_method: Optional[_spawn.SpawnMethodKey] = None, | ||||
|     start_method: _spawn.SpawnMethodKey | None = None, | ||||
| 
 | ||||
|     # enables the multi-process debugger support | ||||
|     debug_mode: bool = False, | ||||
| 
 | ||||
|     # internal logging | ||||
|     loglevel: Optional[str] = None, | ||||
|     loglevel: str | None = None, | ||||
| 
 | ||||
|     enable_modules: Optional[list] = None, | ||||
|     rpc_module_paths: Optional[list] = None, | ||||
|     enable_modules: list | None = None, | ||||
|     rpc_module_paths: list | None = None, | ||||
| 
 | ||||
| ) -> typing.Any: | ||||
|     ''' | ||||
|  | @ -86,13 +85,15 @@ async def open_root_actor( | |||
| 
 | ||||
|     ''' | ||||
|     # Override the global debugger hook to make it play nice with | ||||
|     # ``trio``, see: | ||||
|     # ``trio``, see much discussion in: | ||||
|     # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 | ||||
|     builtin_bp_handler = sys.breakpointhook | ||||
|     orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) | ||||
|     os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' | ||||
| 
 | ||||
|     # attempt to retreive ``trio``'s sigint handler and stash it | ||||
|     # on our debugger lock state. | ||||
|     _debug.Lock._trio_handler =  signal.getsignal(signal.SIGINT) | ||||
|     _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) | ||||
| 
 | ||||
|     # mark top most level process as root actor | ||||
|     _state._runtime_vars['_is_root'] = True | ||||
|  | @ -112,9 +113,21 @@ async def open_root_actor( | |||
|     if start_method is not None: | ||||
|         _spawn.try_set_start_method(start_method) | ||||
| 
 | ||||
|     arbiter_addr = (host, port) = arbiter_addr or ( | ||||
|         _default_arbiter_host, | ||||
|         _default_arbiter_port, | ||||
|     if arbiter_addr is not None: | ||||
|         warnings.warn( | ||||
|             '`arbiter_addr` is now deprecated and has been renamed to' | ||||
|             '`registry_addr`.\nUse that instead..', | ||||
|             DeprecationWarning, | ||||
|             stacklevel=2, | ||||
|         ) | ||||
| 
 | ||||
|     registry_addr = (host, port) = ( | ||||
|         registry_addr | ||||
|         or arbiter_addr | ||||
|         or ( | ||||
|             _default_arbiter_host, | ||||
|             _default_arbiter_port, | ||||
|         ) | ||||
|     ) | ||||
| 
 | ||||
|     loglevel = (loglevel or log._default_loglevel).upper() | ||||
|  | @ -160,7 +173,7 @@ async def open_root_actor( | |||
| 
 | ||||
|     except OSError: | ||||
|         # TODO: make this a "discovery" log level? | ||||
|         logger.warning(f"No actor could be found @ {host}:{port}") | ||||
|         logger.warning(f"No actor registry found @ {host}:{port}") | ||||
| 
 | ||||
|     # create a local actor and start up its main routine/task | ||||
|     if arbiter_found: | ||||
|  | @ -170,7 +183,7 @@ async def open_root_actor( | |||
| 
 | ||||
|         actor = Actor( | ||||
|             name or 'anonymous', | ||||
|             arbiter_addr=arbiter_addr, | ||||
|             arbiter_addr=registry_addr, | ||||
|             loglevel=loglevel, | ||||
|             enable_modules=enable_modules, | ||||
|         ) | ||||
|  | @ -186,7 +199,7 @@ async def open_root_actor( | |||
| 
 | ||||
|         actor = Arbiter( | ||||
|             name or 'arbiter', | ||||
|             arbiter_addr=arbiter_addr, | ||||
|             arbiter_addr=registry_addr, | ||||
|             loglevel=loglevel, | ||||
|             enable_modules=enable_modules, | ||||
|         ) | ||||
|  | @ -244,6 +257,15 @@ async def open_root_actor( | |||
|                 await actor.cancel() | ||||
|     finally: | ||||
|         _state._current_actor = None | ||||
| 
 | ||||
|         # restore breakpoint hook state | ||||
|         sys.breakpointhook = builtin_bp_handler | ||||
|         if orig_bp_path is not None: | ||||
|             os.environ['PYTHONBREAKPOINT'] = orig_bp_path | ||||
|         else: | ||||
|             # clear env back to having no entry | ||||
|             os.environ.pop('PYTHONBREAKPOINT') | ||||
| 
 | ||||
|         logger.runtime("Root actor terminated") | ||||
| 
 | ||||
| 
 | ||||
|  | @ -251,13 +273,13 @@ def run_daemon( | |||
|     enable_modules: list[str], | ||||
| 
 | ||||
|     # runtime kwargs | ||||
|     name: Optional[str] = 'root', | ||||
|     arbiter_addr: tuple[str, int] = ( | ||||
|     name: str | None = 'root', | ||||
|     registry_addr: tuple[str, int] = ( | ||||
|         _default_arbiter_host, | ||||
|         _default_arbiter_port, | ||||
|     ), | ||||
| 
 | ||||
|     start_method: Optional[str] = None, | ||||
|     start_method: str | None = None, | ||||
|     debug_mode: bool = False, | ||||
|     **kwargs | ||||
| 
 | ||||
|  | @ -279,7 +301,7 @@ def run_daemon( | |||
|     async def _main(): | ||||
| 
 | ||||
|         async with open_root_actor( | ||||
|             arbiter_addr=arbiter_addr, | ||||
|             registry_addr=registry_addr, | ||||
|             name=name, | ||||
|             start_method=start_method, | ||||
|             debug_mode=debug_mode, | ||||
|  |  | |||
|  | @ -228,11 +228,11 @@ async def _invoke( | |||
| 
 | ||||
|                 fname = func.__name__ | ||||
|                 if ctx._cancel_called: | ||||
|                     msg = f'{fname} cancelled itself' | ||||
|                     msg = f'`{fname}()` cancelled itself' | ||||
| 
 | ||||
|                 elif cs.cancel_called: | ||||
|                     msg = ( | ||||
|                         f'{fname} was remotely cancelled by its caller ' | ||||
|                         f'`{fname}()` was remotely cancelled by its caller ' | ||||
|                         f'{ctx.chan.uid}' | ||||
|                     ) | ||||
| 
 | ||||
|  | @ -319,7 +319,7 @@ async def _invoke( | |||
|             BrokenPipeError, | ||||
|         ): | ||||
|             # if we can't propagate the error that's a big boo boo | ||||
|             log.error( | ||||
|             log.exception( | ||||
|                 f"Failed to ship error to caller @ {chan.uid} !?" | ||||
|             ) | ||||
| 
 | ||||
|  | @ -455,7 +455,7 @@ class Actor: | |||
|         self._mods: dict[str, ModuleType] = {} | ||||
|         self.loglevel = loglevel | ||||
| 
 | ||||
|         self._arb_addr = ( | ||||
|         self._arb_addr: tuple[str, int] | None = ( | ||||
|             str(arbiter_addr[0]), | ||||
|             int(arbiter_addr[1]) | ||||
|         ) if arbiter_addr else None | ||||
|  | @ -488,7 +488,10 @@ class Actor: | |||
|         self._parent_chan: Optional[Channel] = None | ||||
|         self._forkserver_info: Optional[ | ||||
|             tuple[Any, Any, Any, Any, Any]] = None | ||||
|         self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {}  # type: ignore  # noqa | ||||
|         self._actoruid2nursery: dict[ | ||||
|             tuple[str, str], | ||||
|             ActorNursery | None, | ||||
|         ] = {}  # type: ignore  # noqa | ||||
| 
 | ||||
|     async def wait_for_peer( | ||||
|         self, uid: tuple[str, str] | ||||
|  | @ -826,7 +829,12 @@ class Actor: | |||
| 
 | ||||
|             if ctx._backpressure: | ||||
|                 log.warning(text) | ||||
|                 await send_chan.send(msg) | ||||
|                 try: | ||||
|                     await send_chan.send(msg) | ||||
|                 except trio.BrokenResourceError: | ||||
|                     # XXX: local consumer has closed their side | ||||
|                     # so cancel the far end streaming task | ||||
|                     log.warning(f"{chan} is already closed") | ||||
|             else: | ||||
|                 try: | ||||
|                     raise StreamOverrun(text) from None | ||||
|  | @ -1371,10 +1379,12 @@ async def async_main( | |||
|         actor.lifetime_stack.close() | ||||
| 
 | ||||
|         # Unregister actor from the arbiter | ||||
|         if registered_with_arbiter and ( | ||||
|                 actor._arb_addr is not None | ||||
|         if ( | ||||
|             registered_with_arbiter | ||||
|             and not actor.is_arbiter | ||||
|         ): | ||||
|             failed = False | ||||
|             assert isinstance(actor._arb_addr, tuple) | ||||
|             with trio.move_on_after(0.5) as cs: | ||||
|                 cs.shield = True | ||||
|                 try: | ||||
|  |  | |||
|  | @ -23,13 +23,12 @@ import sys | |||
| import platform | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Awaitable, | ||||
|     Literal, | ||||
|     Optional, | ||||
|     Callable, | ||||
|     TypeVar, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| from collections.abc import Awaitable | ||||
| 
 | ||||
| from exceptiongroup import BaseExceptionGroup | ||||
| import trio | ||||
|  | @ -60,7 +59,7 @@ if TYPE_CHECKING: | |||
| log = get_logger('tractor') | ||||
| 
 | ||||
| # placeholder for an mp start context if so using that backend | ||||
| _ctx: Optional[mp.context.BaseContext] = None | ||||
| _ctx: mp.context.BaseContext | None = None | ||||
| SpawnMethodKey = Literal[ | ||||
|     'trio',  # supported on all platforms | ||||
|     'mp_spawn', | ||||
|  | @ -86,7 +85,7 @@ else: | |||
| def try_set_start_method( | ||||
|     key: SpawnMethodKey | ||||
| 
 | ||||
| ) -> Optional[mp.context.BaseContext]: | ||||
| ) -> mp.context.BaseContext | None: | ||||
|     ''' | ||||
|     Attempt to set the method for process starting, aka the "actor | ||||
|     spawning backend". | ||||
|  | @ -200,16 +199,37 @@ async def cancel_on_completion( | |||
| async def do_hard_kill( | ||||
|     proc: trio.Process, | ||||
|     terminate_after: int = 3, | ||||
| 
 | ||||
| ) -> None: | ||||
|     # NOTE: this timeout used to do nothing since we were shielding | ||||
|     # the ``.wait()`` inside ``new_proc()`` which will pretty much | ||||
|     # never release until the process exits, now it acts as | ||||
|     # a hard-kill time ultimatum. | ||||
|     log.debug(f"Terminating {proc}") | ||||
|     with trio.move_on_after(terminate_after) as cs: | ||||
| 
 | ||||
|         # NOTE: This ``__aexit__()`` shields internally. | ||||
|         async with proc:  # calls ``trio.Process.aclose()`` | ||||
|             log.debug(f"Terminating {proc}") | ||||
|         # NOTE: code below was copied verbatim from the now deprecated | ||||
|         # (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc | ||||
|         # string: | ||||
|         # | ||||
|         # Close any pipes we have to the process (both input and output) | ||||
|         # and wait for it to exit. If cancelled, kills the process and | ||||
|         # waits for it to finish exiting before propagating the | ||||
|         # cancellation. | ||||
|         with trio.CancelScope(shield=True): | ||||
|             if proc.stdin is not None: | ||||
|                 await proc.stdin.aclose() | ||||
|             if proc.stdout is not None: | ||||
|                 await proc.stdout.aclose() | ||||
|             if proc.stderr is not None: | ||||
|                 await proc.stderr.aclose() | ||||
|         try: | ||||
|             await proc.wait() | ||||
|         finally: | ||||
|             if proc.returncode is None: | ||||
|                 proc.kill() | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await proc.wait() | ||||
| 
 | ||||
|     if cs.cancelled_caught: | ||||
|         # XXX: should pretty much never get here unless we have | ||||
|  | @ -260,7 +280,9 @@ async def soft_wait( | |||
| 
 | ||||
|             if proc.poll() is None:  # type: ignore | ||||
|                 log.warning( | ||||
|                     f'Process still alive after cancel request:\n{uid}') | ||||
|                     'Actor still alive after cancel request:\n' | ||||
|                     f'{uid}' | ||||
|                 ) | ||||
| 
 | ||||
|                 n.cancel_scope.cancel() | ||||
|         raise | ||||
|  | @ -353,12 +375,11 @@ async def trio_proc( | |||
|         spawn_cmd.append("--asyncio") | ||||
| 
 | ||||
|     cancelled_during_spawn: bool = False | ||||
|     proc: Optional[trio.Process] = None | ||||
|     proc: trio.Process | None = None | ||||
|     try: | ||||
|         try: | ||||
|             # TODO: needs ``trio_typing`` patch? | ||||
|             proc = await trio.lowlevel.open_process(    # type: ignore | ||||
|                 spawn_cmd) | ||||
|             proc = await trio.lowlevel.open_process(spawn_cmd) | ||||
| 
 | ||||
|             log.runtime(f"Started {proc}") | ||||
| 
 | ||||
|  | @ -442,8 +463,8 @@ async def trio_proc( | |||
|             nursery.cancel_scope.cancel() | ||||
| 
 | ||||
|     finally: | ||||
|         # The "hard" reap since no actor zombies are allowed! | ||||
|         # XXX: do this **after** cancellation/tearfown to avoid | ||||
|         # XXX NOTE XXX: The "hard" reap since no actor zombies are | ||||
|         # allowed! Do this **after** cancellation/teardown to avoid | ||||
|         # killing the process too early. | ||||
|         if proc: | ||||
|             log.cancel(f'Hard reap sequence starting for {subactor.uid}') | ||||
|  |  | |||
|  | @ -50,12 +50,13 @@ log = get_logger(__name__) | |||
| # - use __slots__ on ``Context``? | ||||
| 
 | ||||
| 
 | ||||
| class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||
| class MsgStream(trio.abc.Channel): | ||||
|     ''' | ||||
|     A IPC message stream for receiving logically sequenced values over | ||||
|     an inter-actor ``Channel``. This is the type returned to a local | ||||
|     task which entered either ``Portal.open_stream_from()`` or | ||||
|     ``Context.open_stream()``. | ||||
|     A bidirectional message stream for receiving logically sequenced | ||||
|     values over an inter-actor IPC ``Channel``. | ||||
| 
 | ||||
|     This is the type returned to a local task which entered either | ||||
|     ``Portal.open_stream_from()`` or ``Context.open_stream()``. | ||||
| 
 | ||||
|     Termination rules: | ||||
| 
 | ||||
|  | @ -97,6 +98,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         if self._eoc: | ||||
|             raise trio.EndOfChannel | ||||
| 
 | ||||
|         if self._closed: | ||||
|             raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|         try: | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
|  | @ -110,6 +114,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             # - 'error' | ||||
|             # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|             if self._closed: | ||||
|                 raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|             if msg.get('stop') or self._eoc: | ||||
|                 log.debug(f"{self} was stopped at remote end") | ||||
| 
 | ||||
|  | @ -189,7 +196,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             return | ||||
| 
 | ||||
|         self._eoc = True | ||||
|         self._closed = True | ||||
| 
 | ||||
|         # NOTE: this is super subtle IPC messaging stuff: | ||||
|         # Relay stop iteration to far end **iff** we're | ||||
|  | @ -206,29 +212,32 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
| 
 | ||||
|         # In the bidirectional case, `Context.open_stream()` will create | ||||
|         # the `Actor._cids2qs` entry from a call to | ||||
|         # `Actor.get_context()` and will send the stop message in | ||||
|         # ``__aexit__()`` on teardown so it **does not** need to be | ||||
|         # called here. | ||||
|         if not self._ctx._portal: | ||||
|             # Only for 2 way streams can we can send stop from the | ||||
|             # caller side. | ||||
|             try: | ||||
|                 # NOTE: if this call is cancelled we expect this end to | ||||
|                 # handle as though the stop was never sent (though if it | ||||
|                 # was it shouldn't matter since it's unlikely a user | ||||
|                 # will try to re-use a stream after attemping to close | ||||
|                 # it). | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await self._ctx.send_stop() | ||||
|         # `Actor.get_context()` and will call us here to send the stop | ||||
|         # msg in ``__aexit__()`` on teardown. | ||||
|         try: | ||||
|             # NOTE: if this call is cancelled we expect this end to | ||||
|             # handle as though the stop was never sent (though if it | ||||
|             # was it shouldn't matter since it's unlikely a user | ||||
|             # will try to re-use a stream after attemping to close | ||||
|             # it). | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await self._ctx.send_stop() | ||||
| 
 | ||||
|             except ( | ||||
|                 trio.BrokenResourceError, | ||||
|                 trio.ClosedResourceError | ||||
|             ): | ||||
|                 # the underlying channel may already have been pulled | ||||
|                 # in which case our stop message is meaningless since | ||||
|                 # it can't traverse the transport. | ||||
|                 log.debug(f'Channel for {self} was already closed') | ||||
|         except ( | ||||
|             trio.BrokenResourceError, | ||||
|             trio.ClosedResourceError | ||||
|         ): | ||||
|             # the underlying channel may already have been pulled | ||||
|             # in which case our stop message is meaningless since | ||||
|             # it can't traverse the transport. | ||||
|             ctx = self._ctx | ||||
|             log.warning( | ||||
|                 f'Stream was already destroyed?\n' | ||||
|                 f'actor: {ctx.chan.uid}\n' | ||||
|                 f'ctx id: {ctx.cid}' | ||||
|             ) | ||||
| 
 | ||||
|         self._closed = True | ||||
| 
 | ||||
|         # Do we close the local mem chan ``self._rx_chan`` ??!? | ||||
| 
 | ||||
|  | @ -271,7 +280,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         self, | ||||
| 
 | ||||
|     ) -> AsyncIterator[BroadcastReceiver]: | ||||
|         '''Allocate and return a ``BroadcastReceiver`` which delegates | ||||
|         ''' | ||||
|         Allocate and return a ``BroadcastReceiver`` which delegates | ||||
|         to this message stream. | ||||
| 
 | ||||
|         This allows multiple local tasks to receive each their own copy | ||||
|  | @ -308,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         async with self._broadcaster.subscribe() as bstream: | ||||
|             assert bstream.key != self._broadcaster.key | ||||
|             assert bstream._recv == self._broadcaster._recv | ||||
| 
 | ||||
|             # NOTE: we patch on a `.send()` to the bcaster so that the | ||||
|             # caller can still conduct 2-way streaming using this | ||||
|             # ``bstream`` handle transparently as though it was the msg | ||||
|             # stream instance. | ||||
|             bstream.send = self.send  # type: ignore | ||||
| 
 | ||||
|             yield bstream | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||
|     ''' | ||||
|     Bidirectional message stream for use within an inter-actor actor | ||||
|     ``Context```. | ||||
| 
 | ||||
|     ''' | ||||
|     async def send( | ||||
|         self, | ||||
|         data: Any | ||||
|  | @ -593,23 +603,23 @@ class Context: | |||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=ctx._recv_chan, | ||||
|         ) as rchan: | ||||
|         ) as stream: | ||||
| 
 | ||||
|             if self._portal: | ||||
|                 self._portal._streams.add(rchan) | ||||
|                 self._portal._streams.add(stream) | ||||
| 
 | ||||
|             try: | ||||
|                 self._stream_opened = True | ||||
| 
 | ||||
|                 # ensure we aren't cancelled before delivering | ||||
|                 # the stream | ||||
|                 # XXX: do we need this? | ||||
|                 # ensure we aren't cancelled before yielding the stream | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield rchan | ||||
|                 yield stream | ||||
| 
 | ||||
|                 # XXX: Make the stream "one-shot use".  On exit, signal | ||||
|                 # NOTE: Make the stream "one-shot use".  On exit, signal | ||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the | ||||
|                 # far end. | ||||
|                 await self.send_stop() | ||||
|                 await stream.aclose() | ||||
| 
 | ||||
|             finally: | ||||
|                 if self._portal: | ||||
|  |  | |||
|  | @ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
| ) -> typing.AsyncGenerator[ActorNursery, None]: | ||||
| 
 | ||||
|     # TODO: yay or nay? | ||||
|     # __tracebackhide__ = True | ||||
|     __tracebackhide__ = True | ||||
| 
 | ||||
|     # the collection of errors retreived from spawned sub-actors | ||||
|     errors: dict[tuple[str, str], BaseException] = {} | ||||
|  |  | |||
|  | @ -23,7 +23,6 @@ from __future__ import annotations | |||
| from abc import abstractmethod | ||||
| from collections import deque | ||||
| from contextlib import asynccontextmanager | ||||
| from dataclasses import dataclass | ||||
| from functools import partial | ||||
| from operator import ne | ||||
| from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol | ||||
|  | @ -33,7 +32,10 @@ import trio | |||
| from trio._core._run import Task | ||||
| from trio.abc import ReceiveChannel | ||||
| from trio.lowlevel import current_task | ||||
| from msgspec import Struct | ||||
| from tractor.log import get_logger | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| # A regular invariant generic type | ||||
| T = TypeVar("T") | ||||
|  | @ -86,8 +88,7 @@ class Lagged(trio.TooSlowError): | |||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class BroadcastState: | ||||
| class BroadcastState(Struct): | ||||
|     ''' | ||||
|     Common state to all receivers of a broadcast. | ||||
| 
 | ||||
|  | @ -110,7 +111,35 @@ class BroadcastState: | |||
|     eoc: bool = False | ||||
| 
 | ||||
|     # If the broadcaster was cancelled, we might as well track it | ||||
|     cancelled: bool = False | ||||
|     cancelled: dict[int, Task] = {} | ||||
| 
 | ||||
|     def statistics(self) -> dict[str, Any]: | ||||
|         ''' | ||||
|         Return broadcast receiver group "statistics" like many of | ||||
|         ``trio``'s internal task-sync primitives. | ||||
| 
 | ||||
|         ''' | ||||
|         key: int | None | ||||
|         ev: trio.Event | None | ||||
| 
 | ||||
|         subs = self.subs | ||||
|         if self.recv_ready is not None: | ||||
|             key, ev = self.recv_ready | ||||
|         else: | ||||
|             key = ev = None | ||||
| 
 | ||||
|         qlens: dict[int, int] = {} | ||||
|         for tid, sz in subs.items(): | ||||
|             qlens[tid] = sz if sz != -1 else 0 | ||||
| 
 | ||||
|         return { | ||||
|             'open_consumers': len(subs), | ||||
|             'queued_len_by_task': qlens, | ||||
|             'max_buffer_size': self.maxlen, | ||||
|             'tasks_waiting': ev.statistics().tasks_waiting if ev else 0, | ||||
|             'tasks_cancelled': self.cancelled, | ||||
|             'next_value_receiver_id': key, | ||||
|         } | ||||
| 
 | ||||
| 
 | ||||
| class BroadcastReceiver(ReceiveChannel): | ||||
|  | @ -128,23 +157,40 @@ class BroadcastReceiver(ReceiveChannel): | |||
|         rx_chan: AsyncReceiver, | ||||
|         state: BroadcastState, | ||||
|         receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, | ||||
|         raise_on_lag: bool = True, | ||||
| 
 | ||||
|     ) -> None: | ||||
| 
 | ||||
|         # register the original underlying (clone) | ||||
|         self.key = id(self) | ||||
|         self._state = state | ||||
| 
 | ||||
|         # each consumer has an int count which indicates | ||||
|         # which index contains the next value that the task has not yet | ||||
|         # consumed and thus should read. In the "up-to-date" case the | ||||
|         # consumer task must wait for a new value from the underlying | ||||
|         # receiver and we use ``-1`` as the sentinel for this state. | ||||
|         state.subs[self.key] = -1 | ||||
| 
 | ||||
|         # underlying for this receiver | ||||
|         self._rx = rx_chan | ||||
|         self._recv = receive_afunc or rx_chan.receive | ||||
|         self._closed: bool = False | ||||
|         self._raise_on_lag = raise_on_lag | ||||
| 
 | ||||
|     async def receive(self) -> ReceiveType: | ||||
|     def receive_nowait( | ||||
|         self, | ||||
|         _key: int | None = None, | ||||
|         _state: BroadcastState | None = None, | ||||
| 
 | ||||
|         key = self.key | ||||
|         state = self._state | ||||
|     ) -> Any: | ||||
|         ''' | ||||
|         Sync version of `.receive()` which does all the low level work | ||||
|         of receiving from the underlying/wrapped receive channel. | ||||
| 
 | ||||
|         ''' | ||||
|         key = _key or self.key | ||||
|         state = _state or self._state | ||||
| 
 | ||||
|         # TODO: ideally we can make some way to "lock out" the | ||||
|         # underlying receive channel in some way such that if some task | ||||
|  | @ -177,128 +223,173 @@ class BroadcastReceiver(ReceiveChannel): | |||
|                 # return this value." | ||||
|                 # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging | ||||
| 
 | ||||
|                 mxln = state.maxlen | ||||
|                 lost = seq - mxln | ||||
| 
 | ||||
|                 # decrement to the last value and expect | ||||
|                 # consumer to either handle the ``Lagged`` and come back | ||||
|                 # or bail out on its own (thus un-subscribing) | ||||
|                 state.subs[key] = state.maxlen - 1 | ||||
|                 state.subs[key] = mxln - 1 | ||||
| 
 | ||||
|                 # this task was overrun by the producer side | ||||
|                 task: Task = current_task() | ||||
|                 raise Lagged(f'Task {task.name} was overrun') | ||||
|                 msg = f'Task `{task.name}` overrun and dropped `{lost}` values' | ||||
| 
 | ||||
|                 if self._raise_on_lag: | ||||
|                     raise Lagged(msg) | ||||
|                 else: | ||||
|                     log.warning(msg) | ||||
|                     return self.receive_nowait(_key, _state) | ||||
| 
 | ||||
|             state.subs[key] -= 1 | ||||
|             return value | ||||
| 
 | ||||
|         # current task already has the latest value **and** is the | ||||
|         # first task to begin waiting for a new one | ||||
|         if state.recv_ready is None: | ||||
|         raise trio.WouldBlock | ||||
| 
 | ||||
|             if self._closed: | ||||
|                 raise trio.ClosedResourceError | ||||
|     async def _receive_from_underlying( | ||||
|         self, | ||||
|         key: int, | ||||
|         state: BroadcastState, | ||||
| 
 | ||||
|             event = trio.Event() | ||||
|             state.recv_ready = key, event | ||||
|     ) -> ReceiveType: | ||||
| 
 | ||||
|         if self._closed: | ||||
|             raise trio.ClosedResourceError | ||||
| 
 | ||||
|         event = trio.Event() | ||||
|         assert state.recv_ready is None | ||||
|         state.recv_ready = key, event | ||||
| 
 | ||||
|         try: | ||||
|             # if we're cancelled here it should be | ||||
|             # fine to bail without affecting any other consumers | ||||
|             # right? | ||||
|             try: | ||||
|                 value = await self._recv() | ||||
|             value = await self._recv() | ||||
| 
 | ||||
|                 # items with lower indices are "newer" | ||||
|                 # NOTE: ``collections.deque`` implicitly takes care of | ||||
|                 # trucating values outside our ``state.maxlen``. In the | ||||
|                 # alt-backend-array-case we'll need to make sure this is | ||||
|                 # implemented in similar ringer-buffer-ish style. | ||||
|                 state.queue.appendleft(value) | ||||
|             # items with lower indices are "newer" | ||||
|             # NOTE: ``collections.deque`` implicitly takes care of | ||||
|             # trucating values outside our ``state.maxlen``. In the | ||||
|             # alt-backend-array-case we'll need to make sure this is | ||||
|             # implemented in similar ringer-buffer-ish style. | ||||
|             state.queue.appendleft(value) | ||||
| 
 | ||||
|                 # broadcast new value to all subscribers by increasing | ||||
|                 # all sequence numbers that will point in the queue to | ||||
|                 # their latest available value. | ||||
|             # broadcast new value to all subscribers by increasing | ||||
|             # all sequence numbers that will point in the queue to | ||||
|             # their latest available value. | ||||
| 
 | ||||
|                 # don't decrement the sequence for this task since we | ||||
|                 # already retreived the last value | ||||
|             # don't decrement the sequence for this task since we | ||||
|             # already retreived the last value | ||||
| 
 | ||||
|                 # XXX: which of these impls is fastest? | ||||
|             # XXX: which of these impls is fastest? | ||||
|             # subs = state.subs.copy() | ||||
|             # subs.pop(key) | ||||
| 
 | ||||
|                 # subs = state.subs.copy() | ||||
|                 # subs.pop(key) | ||||
| 
 | ||||
|                 for sub_key in filter( | ||||
|                     # lambda k: k != key, state.subs, | ||||
|                     partial(ne, key), state.subs, | ||||
|                 ): | ||||
|                     state.subs[sub_key] += 1 | ||||
| 
 | ||||
|                 # NOTE: this should ONLY be set if the above task was *NOT* | ||||
|                 # cancelled on the `._recv()` call. | ||||
|                 event.set() | ||||
|                 return value | ||||
| 
 | ||||
|             except trio.EndOfChannel: | ||||
|                 # if any one consumer gets an EOC from the underlying | ||||
|                 # receiver we need to unblock and send that signal to | ||||
|                 # all other consumers. | ||||
|                 self._state.eoc = True | ||||
|                 if event.statistics().tasks_waiting: | ||||
|                     event.set() | ||||
|                 raise | ||||
| 
 | ||||
|             except ( | ||||
|                 trio.Cancelled, | ||||
|             for sub_key in filter( | ||||
|                 # lambda k: k != key, state.subs, | ||||
|                 partial(ne, key), state.subs, | ||||
|             ): | ||||
|                 # handle cancelled specially otherwise sibling | ||||
|                 # consumers will be awoken with a sequence of -1 | ||||
|                 # and will potentially try to rewait the underlying | ||||
|                 # receiver instead of just cancelling immediately. | ||||
|                 self._state.cancelled = True | ||||
|                 if event.statistics().tasks_waiting: | ||||
|                     event.set() | ||||
|                 raise | ||||
|                 state.subs[sub_key] += 1 | ||||
| 
 | ||||
|             finally: | ||||
|             # NOTE: this should ONLY be set if the above task was *NOT* | ||||
|             # cancelled on the `._recv()` call. | ||||
|             event.set() | ||||
|             return value | ||||
| 
 | ||||
|                 # Reset receiver waiter task event for next blocking condition. | ||||
|                 # this MUST be reset even if the above ``.recv()`` call | ||||
|                 # was cancelled to avoid the next consumer from blocking on | ||||
|                 # an event that won't be set! | ||||
|                 state.recv_ready = None | ||||
|         except trio.EndOfChannel: | ||||
|             # if any one consumer gets an EOC from the underlying | ||||
|             # receiver we need to unblock and send that signal to | ||||
|             # all other consumers. | ||||
|             self._state.eoc = True | ||||
|             if event.statistics().tasks_waiting: | ||||
|                 event.set() | ||||
|             raise | ||||
| 
 | ||||
|         except ( | ||||
|             trio.Cancelled, | ||||
|         ): | ||||
|             # handle cancelled specially otherwise sibling | ||||
|             # consumers will be awoken with a sequence of -1 | ||||
|             # and will potentially try to rewait the underlying | ||||
|             # receiver instead of just cancelling immediately. | ||||
|             self._state.cancelled[key] = current_task() | ||||
|             if event.statistics().tasks_waiting: | ||||
|                 event.set() | ||||
|             raise | ||||
| 
 | ||||
|         finally: | ||||
|             # Reset receiver waiter task event for next blocking condition. | ||||
|             # this MUST be reset even if the above ``.recv()`` call | ||||
|             # was cancelled to avoid the next consumer from blocking on | ||||
|             # an event that won't be set! | ||||
|             state.recv_ready = None | ||||
| 
 | ||||
|     async def receive(self) -> ReceiveType: | ||||
|         key = self.key | ||||
|         state = self._state | ||||
| 
 | ||||
|         try: | ||||
|             return self.receive_nowait( | ||||
|                 _key=key, | ||||
|                 _state=state, | ||||
|             ) | ||||
|         except trio.WouldBlock: | ||||
|             pass | ||||
| 
 | ||||
|         # current task already has the latest value **and** is the | ||||
|         # first task to begin waiting for a new one so we begin blocking | ||||
|         # until rescheduled with the a new value from the underlying. | ||||
|         if state.recv_ready is None: | ||||
|             return await self._receive_from_underlying(key, state) | ||||
| 
 | ||||
|         # This task is all caught up and ready to receive the latest | ||||
|         # value, so queue sched it on the internal event. | ||||
|         # value, so queue/schedule it to be woken on the next internal | ||||
|         # event. | ||||
|         else: | ||||
|             seq = state.subs[key] | ||||
|             assert seq == -1  # sanity | ||||
|             _, ev = state.recv_ready | ||||
|             await ev.wait() | ||||
|             while state.recv_ready is not None: | ||||
|                 # seq = state.subs[key] | ||||
|                 # assert seq == -1  # sanity | ||||
|                 _, ev = state.recv_ready | ||||
|                 await ev.wait() | ||||
|                 try: | ||||
|                     return self.receive_nowait( | ||||
|                         _key=key, | ||||
|                         _state=state, | ||||
|                     ) | ||||
|                 except trio.WouldBlock: | ||||
|                     if self._closed: | ||||
|                         raise trio.ClosedResourceError | ||||
| 
 | ||||
|             # NOTE: if we ever would like the behaviour where if the | ||||
|             # first task to recv on the underlying is cancelled but it | ||||
|             # still DOES trigger the ``.recv_ready``, event we'll likely need | ||||
|             # this logic: | ||||
|                     subs = state.subs | ||||
|                     if ( | ||||
|                         len(subs) == 1 | ||||
|                         and key in subs | ||||
|                         # or cancelled | ||||
|                     ): | ||||
|                         # XXX: we are the last and only user of this BR so | ||||
|                         # likely it makes sense to unwind back to the | ||||
|                         # underlying? | ||||
|                         # import tractor | ||||
|                         # await tractor.breakpoint() | ||||
|                         log.warning( | ||||
|                             f'Only one sub left for {self}?\n' | ||||
|                             'We can probably unwind from breceiver?' | ||||
|                         ) | ||||
| 
 | ||||
|             if seq > -1: | ||||
|                 # stuff from above.. | ||||
|                 seq = state.subs[key] | ||||
|                     # XXX: In the case where the first task to allocate the | ||||
|                     # ``.recv_ready`` event is cancelled we will be woken | ||||
|                     # with a non-incremented sequence number (the ``-1`` | ||||
|                     # sentinel) and thus will read the oldest value if we | ||||
|                     # use that. Instead we need to detect if we have not | ||||
|                     # been incremented and then receive again. | ||||
|                     # return await self.receive() | ||||
| 
 | ||||
|                 value = state.queue[seq] | ||||
|                 state.subs[key] -= 1 | ||||
|                 return value | ||||
| 
 | ||||
|             elif seq == -1: | ||||
|                 # XXX: In the case where the first task to allocate the | ||||
|                 # ``.recv_ready`` event is cancelled we will be woken with | ||||
|                 # a non-incremented sequence number and thus will read the | ||||
|                 # oldest value if we use that. Instead we need to detect if | ||||
|                 # we have not been incremented and then receive again. | ||||
|                 return await self.receive() | ||||
| 
 | ||||
|             else: | ||||
|                 raise ValueError(f'Invalid sequence {seq}!?') | ||||
|             return await self._receive_from_underlying(key, state) | ||||
| 
 | ||||
|     @asynccontextmanager | ||||
|     async def subscribe( | ||||
|         self, | ||||
|         raise_on_lag: bool = True, | ||||
| 
 | ||||
|     ) -> AsyncIterator[BroadcastReceiver]: | ||||
|         ''' | ||||
|         Subscribe for values from this broadcast receiver. | ||||
|  | @ -316,6 +407,7 @@ class BroadcastReceiver(ReceiveChannel): | |||
|             rx_chan=self._rx, | ||||
|             state=state, | ||||
|             receive_afunc=self._recv, | ||||
|             raise_on_lag=raise_on_lag, | ||||
|         ) | ||||
|         # assert clone in state.subs | ||||
|         assert br.key in state.subs | ||||
|  | @ -352,7 +444,8 @@ def broadcast_receiver( | |||
| 
 | ||||
|     recv_chan: AsyncReceiver, | ||||
|     max_buffer_size: int, | ||||
|     **kwargs, | ||||
|     receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, | ||||
|     raise_on_lag: bool = True, | ||||
| 
 | ||||
| ) -> BroadcastReceiver: | ||||
| 
 | ||||
|  | @ -363,5 +456,6 @@ def broadcast_receiver( | |||
|             maxlen=max_buffer_size, | ||||
|             subs={}, | ||||
|         ), | ||||
|         **kwargs, | ||||
|         receive_afunc=receive_afunc, | ||||
|         raise_on_lag=raise_on_lag, | ||||
|     ) | ||||
|  |  | |||
|  | @ -133,13 +133,13 @@ async def gather_contexts( | |||
|         # deliver control once all managers have started up | ||||
|         await all_entered.wait() | ||||
| 
 | ||||
|         # NOTE: order *should* be preserved in the output values | ||||
|         # since ``dict``s are now implicitly ordered. | ||||
|         yield tuple(unwrapped.values()) | ||||
| 
 | ||||
|         # we don't need a try/finally since cancellation will be triggered | ||||
|         # by the surrounding nursery on error. | ||||
|         parent_exit.set() | ||||
|         try: | ||||
|             yield tuple(unwrapped.values()) | ||||
|         finally: | ||||
|             # NOTE: this is ABSOLUTELY REQUIRED to avoid | ||||
|             # the following wacky bug: | ||||
|             # <tractorbugurlhere> | ||||
|             parent_exit.set() | ||||
| 
 | ||||
| 
 | ||||
| # Per actor task caching helpers. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue