From 71a35aadef3682aff68ac43e5819c78d7d5e3da7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Feb 2021 13:35:22 -0500 Subject: [PATCH] Drop worker pool and add debug example --- docs/README.rst | 190 ++++++++++++++++++------------------------------ 1 file changed, 71 insertions(+), 119 deletions(-) diff --git a/docs/README.rst b/docs/README.rst index 5bf7952..ba71a1c 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -71,28 +71,32 @@ Alluring Features Example: self-destruct a process tree ------------------------------------- +``tractor`` protects you from zombies, no matter what. .. code:: python - """ - Run with a process monitor from a terminal using: - $TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $! + """ + Run with a process monitor from a terminal using:: - """ - from multiprocessing import cpu_count - import os + $TERM -e watch -n 0.1 "pstree -a $$" \ + & python examples/parallelism/we_are_processes.py \ + && kill $! - import tractor - import trio + """ + from multiprocessing import cpu_count + import os + + import tractor + import trio - async def target(): + async def target(): print(f"Yo, i'm '{tractor.current_actor().name}' " f"running in pid {os.getpid()}") await trio.sleep_forever() - async def main(): + async def main(): async with tractor.open_nursery() as n: @@ -106,7 +110,7 @@ Example: self-destruct a process tree raise Exception('Self Destructed') - if __name__ == '__main__': + if __name__ == '__main__': try: trio.run(main) except Exception: @@ -115,133 +119,81 @@ Example: self-destruct a process tree The example you're probably after... ------------------------------------ -It seems the initial query from most new users is "how do I make a worker +It seems the initial ask from most new users is "how do I make a worker pool thing?". ``tractor`` is built to handle any SC process tree you can -imagine; the "worker pool" pattern is a trivial special case: +imagine; the "worker pool" pattern is a trivial special case. + +We have a `full re-implementation `_ of the std-lib's +``concurrent.futures.ProcessPoolExecutor`` example for reference. + +You can run it like so (from this dir) to see the process tree in +real time:: + + $TERM -e watch -n 0.1 "pstree -a $$" \ + & python examples/parallelism/concurrent_actors_primes.py \ + && kill $! + +This uses no extra threads, fancy semaphores or futures; all we need +is ``tractor``'s IPC! + + +.. _concurrent_actors_primes: https://github.com/goodboy/tractor/blob/readme_pump/examples/parallelism/concurrent_actors_primes.py + + +"Native" sub-process debugging +------------------------------ +Using the magic of `pdb++`_ and some IPC tricks we've +been able to create a native feeling debugging experience for +any (sub)-process in your ``tractor`` tree. .. code:: python - """ - Demonstration of the prime number detector example from the - ``concurrent.futures`` docs: + from os import getpid - https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example - - This uses no extra threads, fancy semaphores or futures; all we need - is ``tractor``'s channels. - - """ - from contextlib import asynccontextmanager - from typing import List, Callable - import itertools - import math - import time - - import tractor - import trio - from async_generator import aclosing + import tractor + import trio - PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 1099726899285419, - ] + async def breakpoint_forever(): + "Indefinitely re-enter debugger in child actor." + while True: + yield 'yo' + await tractor.breakpoint() - def is_prime(n): - if n < 2: - return False - if n == 2: - return True - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True + async def name_error(): + "Raise a ``NameError``" + getattr(doggypants) - @asynccontextmanager - async def worker_pool(workers=4): - """Though it's a trivial special case for ``tractor``, the well - known "worker pool" seems to be the defacto "but, I want this - process pattern!" for most parallelism pilgrims. + async def main(): + """Test breakpoint in a streaming actor. + """ + async with tractor.open_nursery( + debug_mode=True, + loglevel='error', + ) as n: - Yes, the workers stay alive (and ready for work) until you close - the context. - """ - async with tractor.open_nursery() as tn: + p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) + p1 = await n.start_actor('name_error', enable_modules=[__name__]) - portals = [] - snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) - - for i in range(workers): - - # this starts a new sub-actor (process + trio runtime) and - # stores it's "portal" for later use to "submit jobs" (ugh). - portals.append( - await tn.start_actor( - f'worker_{i}', - enable_modules=[__name__], - ) - ) - - async def _map( - worker_func: Callable[[int], bool], - sequence: List[int] - ) -> List[bool]: - - # define an async (local) task to collect results from workers - async def send_result(func, value, portal): - await snd_chan.send((value, await portal.run(func, n=value))) - - async with trio.open_nursery() as n: - - for value, portal in zip(sequence, itertools.cycle(portals)): - n.start_soon( - send_result, - worker_func, - value, - portal - ) - - # deliver results as they arrive - for _ in range(len(sequence)): - yield await recv_chan.receive() - - # deliver the parallel "worker mapper" to user code - yield _map - - # tear down all "workers" on pool close - await tn.cancel() + # retreive results + stream = await p0.run(breakpoint_forever) + await p1.run(name_error) - async def main(): - - async with worker_pool() as actor_map: - - start = time.time() - - async with aclosing(actor_map(is_prime, PRIMES)) as results: - async for number, prime in results: - - print(f'{number} is prime: {prime}') - - print(f'processing took {time.time() - start} seconds') + if __name__ == '__main__': + trio.run(main) - if __name__ == '__main__': - start = time.time() - trio.run(main) - print(f'script took {time.time() - start} seconds') +You can run this with:: + + >>> python examples/debugging/multi_daemon_subactors.py + +And, yes, there's a built-in crash handling mode B) +We're hoping to add a respawn-from-repl system soon! Feel like saying hi?