forked from goodboy/tractor
1
0
Fork 0

Drop worker pool and add debug example

readme_pump
Tyler Goodlet 2021-02-22 13:35:22 -05:00
parent 4a512bc879
commit 71a35aadef
1 changed files with 71 additions and 119 deletions

View File

@ -71,28 +71,32 @@ Alluring Features
Example: self-destruct a process tree Example: self-destruct a process tree
------------------------------------- -------------------------------------
``tractor`` protects you from zombies, no matter what.
.. code:: python .. code:: python
""" """
Run with a process monitor from a terminal using: Run with a process monitor from a terminal using::
$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $!
""" $TERM -e watch -n 0.1 "pstree -a $$" \
from multiprocessing import cpu_count & python examples/parallelism/we_are_processes.py \
import os && kill $!
import tractor """
import trio from multiprocessing import cpu_count
import os
import tractor
import trio
async def target(): async def target():
print(f"Yo, i'm '{tractor.current_actor().name}' " print(f"Yo, i'm '{tractor.current_actor().name}' "
f"running in pid {os.getpid()}") f"running in pid {os.getpid()}")
await trio.sleep_forever() await trio.sleep_forever()
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
@ -106,7 +110,7 @@ Example: self-destruct a process tree
raise Exception('Self Destructed') raise Exception('Self Destructed')
if __name__ == '__main__': if __name__ == '__main__':
try: try:
trio.run(main) trio.run(main)
except Exception: except Exception:
@ -115,133 +119,81 @@ Example: self-destruct a process tree
The example you're probably after... The example you're probably after...
------------------------------------ ------------------------------------
It seems the initial query from most new users is "how do I make a worker It seems the initial ask from most new users is "how do I make a worker
pool thing?". pool thing?".
``tractor`` is built to handle any SC process tree you can ``tractor`` is built to handle any SC process tree you can
imagine; the "worker pool" pattern is a trivial special case: imagine; the "worker pool" pattern is a trivial special case.
We have a `full re-implementation <concurrent_actors_primes>`_ of the std-lib's
``concurrent.futures.ProcessPoolExecutor`` example for reference.
You can run it like so (from this dir) to see the process tree in
real time::
$TERM -e watch -n 0.1 "pstree -a $$" \
& python examples/parallelism/concurrent_actors_primes.py \
&& kill $!
This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s IPC!
.. _concurrent_actors_primes: https://github.com/goodboy/tractor/blob/readme_pump/examples/parallelism/concurrent_actors_primes.py
"Native" sub-process debugging
------------------------------
Using the magic of `pdb++`_ and some IPC tricks we've
been able to create a native feeling debugging experience for
any (sub)-process in your ``tractor`` tree.
.. code:: python .. code:: python
""" from os import getpid
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example import tractor
import trio
This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels.
"""
from contextlib import asynccontextmanager
from typing import List, Callable
import itertools
import math
import time
import tractor
import trio
from async_generator import aclosing
PRIMES = [ async def breakpoint_forever():
112272535095293, "Indefinitely re-enter debugger in child actor."
112582705942171, while True:
112272535095293, yield 'yo'
115280095190773, await tractor.breakpoint()
115797848077099,
1099726899285419,
]
def is_prime(n): async def name_error():
if n < 2: "Raise a ``NameError``"
return False getattr(doggypants)
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@asynccontextmanager async def main():
async def worker_pool(workers=4): """Test breakpoint in a streaming actor.
"""Though it's a trivial special case for ``tractor``, the well """
known "worker pool" seems to be the defacto "but, I want this async with tractor.open_nursery(
process pattern!" for most parallelism pilgrims. debug_mode=True,
loglevel='error',
) as n:
Yes, the workers stay alive (and ready for work) until you close p0 = await n.start_actor('bp_forever', enable_modules=[__name__])
the context. p1 = await n.start_actor('name_error', enable_modules=[__name__])
"""
async with tractor.open_nursery() as tn:
portals = [] # retreive results
snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) stream = await p0.run(breakpoint_forever)
await p1.run(name_error)
for i in range(workers):
# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
portals.append(
await tn.start_actor(
f'worker_{i}',
enable_modules=[__name__],
)
)
async def _map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:
# define an async (local) task to collect results from workers
async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))
async with trio.open_nursery() as n:
for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon(
send_result,
worker_func,
value,
portal
)
# deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()
# deliver the parallel "worker mapper" to user code
yield _map
# tear down all "workers" on pool close
await tn.cancel()
async def main(): if __name__ == '__main__':
trio.run(main)
async with worker_pool() as actor_map:
start = time.time()
async with aclosing(actor_map(is_prime, PRIMES)) as results:
async for number, prime in results:
print(f'{number} is prime: {prime}')
print(f'processing took {time.time() - start} seconds')
if __name__ == '__main__': You can run this with::
start = time.time()
trio.run(main) >>> python examples/debugging/multi_daemon_subactors.py
print(f'script took {time.time() - start} seconds')
And, yes, there's a built-in crash handling mode B)
We're hoping to add a respawn-from-repl system soon!
Feel like saying hi? Feel like saying hi?