diff --git a/README.rst b/docs/README.rst similarity index 100% rename from README.rst rename to docs/README.rst diff --git a/examples/parallelism/_concurrent_futures_primes.py b/examples/parallelism/_concurrent_futures_primes.py new file mode 100644 index 0000000..81ae23d --- /dev/null +++ b/examples/parallelism/_concurrent_futures_primes.py @@ -0,0 +1,40 @@ +import time +import concurrent.futures +import math + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def main(): + with concurrent.futures.ProcessPoolExecutor() as executor: + start = time.time() + + for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + print('%d is prime: %s' % (number, prime)) + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + + start = time.time() + main() + print(f'script took {time.time() - start} seconds') diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py new file mode 100644 index 0000000..3ff8dab --- /dev/null +++ b/examples/parallelism/concurrent_actors_primes.py @@ -0,0 +1,119 @@ +""" +Demonstration of the prime number detector example from the +``concurrent.futures`` docs: + +https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example + +This uses no extra threads, fancy semaphores or futures; all we need +is ``tractor``'s channels. + +""" +from contextlib import asynccontextmanager +from typing import List, Callable +import itertools +import math +import time + +import tractor +import trio +from async_generator import aclosing + + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419, +] + + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + + +@asynccontextmanager +async def worker_pool(workers=4): + """Though it's a trivial special case for ``tractor``, the well + known "worker pool" seems to be the defacto "but, I want this + process pattern!" for most parallelism pilgrims. + + Yes, the workers stay alive (and ready for work) until you close + the context. + """ + async with tractor.open_nursery() as tn: + + portals = [] + snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) + + for i in range(workers): + + # this starts a new sub-actor (process + trio runtime) and + # stores it's "portal" for later use to "submit jobs" (ugh). + portals.append( + await tn.start_actor( + f'worker_{i}', + enable_modules=[__name__], + ) + ) + + async def _map( + worker_func: Callable[[int], bool], + sequence: List[int] + ) -> List[bool]: + + # define an async (local) task to collect results from workers + async def send_result(func, value, portal): + await snd_chan.send((value, await portal.run(func, n=value))) + + async with trio.open_nursery() as n: + + for value, portal in zip(sequence, itertools.cycle(portals)): + n.start_soon( + send_result, + worker_func, + value, + portal + ) + + # deliver results as they arrive + for _ in range(len(sequence)): + yield await recv_chan.receive() + + # deliver the parallel "worker mapper" to user code + yield _map + + # tear down all "workers" on pool close + await tn.cancel() + + +async def main(): + + async with worker_pool() as actor_map: + + start = time.time() + + async with aclosing(actor_map(is_prime, PRIMES)) as results: + async for number, prime in results: + + print(f'{number} is prime: {prime}') + + print(f'processing took {time.time() - start} seconds') + + +if __name__ == '__main__': + start = time.time() + trio.run(main) + print(f'script took {time.time() - start} seconds') diff --git a/examples/parallelism/we_are_processes.py b/examples/parallelism/we_are_processes.py new file mode 100644 index 0000000..8283b9c --- /dev/null +++ b/examples/parallelism/we_are_processes.py @@ -0,0 +1,37 @@ +""" +Run with a process monitor from a terminal using: +$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $! + +""" +from multiprocessing import cpu_count +import os + +import tractor +import trio + + +async def target(): + print(f"Yo, i'm '{tractor.current_actor().name}' " + f"running in pid {os.getpid()}") + await trio.sleep_forever() + + +async def main(): + + async with tractor.open_nursery() as n: + + for i in range(cpu_count()): + await n.run_in_actor(target, name=f'worker_{i}') + + print('This process tree will self-destruct in 1 sec...') + await trio.sleep(1) + + # you could have done this yourself + raise Exception('Self Destructed') + + +if __name__ == '__main__': + try: + trio.run(main) + except Exception: + print('Zombies Contained') diff --git a/setup.py b/setup.py index e31e359..20e748d 100755 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ # along with this program. If not, see . from setuptools import setup -with open('README.rst', encoding='utf-8') as f: +with open('docs/README.rst', encoding='utf-8') as f: readme = f.read() diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index ea676a2..632d85c 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -78,13 +78,15 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): @pytest.mark.parametrize( 'example_script', - [ - f for f in os.listdir(examples_dir()) - if ( - ('__' not in f) and - ('debugging' not in f) - ) + + # walk yields: (dirpath, dirnames, filenames) + [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] + + if '__' not in f + and f[0] != '_' + and 'debugging' not in p[0] ], + ids=lambda t: t[1], ) def test_example(run_example_in_subproc, example_script): """Load and run scripts from this repo's ``examples/`` dir as a user @@ -95,7 +97,7 @@ def test_example(run_example_in_subproc, example_script): test directory and invoke the script as a module with ``python -m test_example``. """ - ex_file = os.path.join(examples_dir(), example_script) + ex_file = os.path.join(*example_script) with open(ex_file, 'r') as ex: code = ex.read() diff --git a/tractor/_forkserver_override.py b/tractor/_forkserver_override.py index 25134ff..d799bb8 100644 --- a/tractor/_forkserver_override.py +++ b/tractor/_forkserver_override.py @@ -6,6 +6,8 @@ semaphore tracker per ``MainProcess``. .. note:: There is no type hinting in this code base (yet) to remain as a close as possible to upstream. """ +# type: ignore + import os import socket import signal