From 7db57391432fa57475307aff9d33003fcec04ccb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Dec 2020 12:45:27 -0500 Subject: [PATCH 01/12] Add our version of the std lib's "worker pool" This is a draft of the `tractor` way to implement the example from the "processs pool" in the stdlib's `concurrent.futures` docs: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example Our runtime is of course slower to startup but once up we of course get the same performance, this confirms that we need to focus some effort not on warm up and teardown times. The mp forkserver method definitely improves startup delay; rolling our own will likely be a good hot spot to play with. What's really nice is our implementation is done in approx 10th the code ;) Also, do we want offer and interface that yields results as they arrive? Relates to #175 --- examples/concurrent_actors_primes.py | 119 ++++++++++++++++++++++++++ examples/concurrent_futures_primes.py | 40 +++++++++ 2 files changed, 159 insertions(+) create mode 100644 examples/concurrent_actors_primes.py create mode 100644 examples/concurrent_futures_primes.py diff --git a/examples/concurrent_actors_primes.py b/examples/concurrent_actors_primes.py new file mode 100644 index 0000000..2867fe5 --- /dev/null +++ b/examples/concurrent_actors_primes.py @@ -0,0 +1,119 @@ +""" +Demonstration of the prime number detector example from the +``concurrent.futures`` docs: + +https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example + +This uses no extra threads or fancy semaphores besides ``tractor``'s +(TCP) channels. + +""" +from contextlib import asynccontextmanager +from typing import List, Callable +import itertools +import math +import time + +import tractor +import trio + + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + + +@asynccontextmanager +async def worker_pool(workers=4): + """Though it's a trivial special case for ``tractor``, the well + known "worker pool" seems to be the defacto "I want this process + pattern" for most parallelism pilgrims. + + """ + + async with tractor.open_nursery() as tn: + + portals = [] + results = [] + + for i in range(workers): + + # this starts a new sub-actor (process + trio runtime) and + # stores it's "portal" for later use to "submit jobs" (ugh). + portals.append( + await tn.start_actor( + f'worker_{i}', + rpc_module_paths=[__name__], + ) + ) + + async def map( + worker_func: Callable[[int], bool], + sequence: List[int] + ) -> List[bool]: + + # define an async (local) task to collect results from workers + async def collect_portal_result(func, value, portal): + + results.append((value, await portal.run(func, n=value))) + + async with trio.open_nursery() as n: + + for value, portal in zip(sequence, itertools.cycle(portals)): + + n.start_soon( + collect_portal_result, + worker_func, + value, + portal + ) + + return results + + yield map + + # tear down all "workers" + await tn.cancel() + + +async def main(): + async with worker_pool() as actor_map: + + start = time.time() + # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + for number, prime in await actor_map(is_prime, PRIMES): + print(f'{number} is prime: {prime}') + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + start = time.time() + tractor.run( + main, + loglevel='ERROR', + + # uncomment to use ``multiprocessing`` fork server backend + # which gives a startup time boost at the expense of nested + # processs scalability + # start_method='forkserver') + ) + print(f'script took {time.time() - start} seconds') diff --git a/examples/concurrent_futures_primes.py b/examples/concurrent_futures_primes.py new file mode 100644 index 0000000..81ae23d --- /dev/null +++ b/examples/concurrent_futures_primes.py @@ -0,0 +1,40 @@ +import time +import concurrent.futures +import math + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def main(): + with concurrent.futures.ProcessPoolExecutor() as executor: + start = time.time() + + for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + print('%d is prime: %s' % (number, prime)) + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + + start = time.time() + main() + print(f'script took {time.time() - start} seconds') From 255576588219743f6bd4f6255f1b10cb7fd2c358 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 13:30:24 -0500 Subject: [PATCH 02/12] Make new paralellism example space --- examples/{ => parallelism}/concurrent_actors_primes.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename examples/{ => parallelism}/concurrent_actors_primes.py (100%) diff --git a/examples/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py similarity index 100% rename from examples/concurrent_actors_primes.py rename to examples/parallelism/concurrent_actors_primes.py From 3c320f467f03261931a253c5e332b00f5067ce17 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 13:34:52 -0500 Subject: [PATCH 03/12] Remove use of tractor.run() --- .../parallelism/concurrent_actors_primes.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 2867fe5..7df2e43 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -50,7 +50,14 @@ async def worker_pool(workers=4): """ - async with tractor.open_nursery() as tn: + async with tractor.open_nursery( + loglevel='ERROR', + + # uncomment to use ``multiprocessing`` fork server backend + # which gives a startup time boost at the expense of nested + # processs scalability + # start_method='forkserver') + ) as tn: portals = [] results = [] @@ -62,7 +69,7 @@ async def worker_pool(workers=4): portals.append( await tn.start_actor( f'worker_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) ) @@ -105,15 +112,9 @@ async def main(): print(f'processing took {time.time() - start} seconds') -if __name__ == '__main__': - start = time.time() - tractor.run( - main, - loglevel='ERROR', - # uncomment to use ``multiprocessing`` fork server backend - # which gives a startup time boost at the expense of nested - # processs scalability - # start_method='forkserver') - ) +if __name__ == '__main__': + + start = time.time() + trio.run(main) print(f'script took {time.time() - start} seconds') From 9b07e9ad7cbc12abc33961646c9aad35534a276d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 13:57:42 -0500 Subject: [PATCH 04/12] Yield results on demand using a mem chan --- .../parallelism/concurrent_actors_primes.py | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 7df2e43..874a24f 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -24,7 +24,8 @@ PRIMES = [ 112272535095293, 115280095190773, 115797848077099, - 1099726899285419] + 1099726899285419, +] def is_prime(n): @@ -50,17 +51,10 @@ async def worker_pool(workers=4): """ - async with tractor.open_nursery( - loglevel='ERROR', - - # uncomment to use ``multiprocessing`` fork server backend - # which gives a startup time boost at the expense of nested - # processs scalability - # start_method='forkserver') - ) as tn: + async with tractor.open_nursery() as tn: portals = [] - results = [] + snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) for i in range(workers): @@ -79,35 +73,37 @@ async def worker_pool(workers=4): ) -> List[bool]: # define an async (local) task to collect results from workers - async def collect_portal_result(func, value, portal): - - results.append((value, await portal.run(func, n=value))) + async def send_result(func, value, portal): + await snd_chan.send((value, await portal.run(func, n=value))) async with trio.open_nursery() as n: for value, portal in zip(sequence, itertools.cycle(portals)): - n.start_soon( - collect_portal_result, + send_result, worker_func, value, portal ) - return results + # deliver results as they arrive + for _ in range(len(sequence)): + yield await recv_chan.receive() yield map - # tear down all "workers" + # tear down all "workers" on pool close await tn.cancel() async def main(): + async with worker_pool() as actor_map: start = time.time() # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - for number, prime in await actor_map(is_prime, PRIMES): + async for number, prime in actor_map(is_prime, PRIMES): + print(f'{number} is prime: {prime}') print(f'processing took {time.time() - start} seconds') From 57a24cdcf884212a18a9371ccfefc589539367c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 15:41:35 -0500 Subject: [PATCH 05/12] More comments --- .../parallelism/concurrent_actors_primes.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 874a24f..1f5fe48 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -4,8 +4,8 @@ Demonstration of the prime number detector example from the https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example -This uses no extra threads or fancy semaphores besides ``tractor``'s -(TCP) channels. +This uses no extra threads, fancy semaphores or futures; all we need +is ``tractor``'s channels. """ from contextlib import asynccontextmanager @@ -46,11 +46,12 @@ def is_prime(n): @asynccontextmanager async def worker_pool(workers=4): """Though it's a trivial special case for ``tractor``, the well - known "worker pool" seems to be the defacto "I want this process - pattern" for most parallelism pilgrims. + known "worker pool" seems to be the defacto "but, I want this + process pattern!" for most parallelism pilgrims. + Yes, the workers stay alive (and ready for work) until you close + the context. """ - async with tractor.open_nursery() as tn: portals = [] @@ -67,7 +68,7 @@ async def worker_pool(workers=4): ) ) - async def map( + async def _map( worker_func: Callable[[int], bool], sequence: List[int] ) -> List[bool]: @@ -90,7 +91,8 @@ async def worker_pool(workers=4): for _ in range(len(sequence)): yield await recv_chan.receive() - yield map + # deliver the parallel "worker mapper" to user code + yield _map # tear down all "workers" on pool close await tn.cancel() @@ -110,7 +112,6 @@ async def main(): if __name__ == '__main__': - start = time.time() trio.run(main) print(f'script took {time.time() - start} seconds') From da8c8c1773092b924bdd3bcd96704e9063b6300a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 15:42:05 -0500 Subject: [PATCH 06/12] Add concise readme example --- examples/parallelism/we_are_processes.py | 34 ++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 examples/parallelism/we_are_processes.py diff --git a/examples/parallelism/we_are_processes.py b/examples/parallelism/we_are_processes.py new file mode 100644 index 0000000..ac4b594 --- /dev/null +++ b/examples/parallelism/we_are_processes.py @@ -0,0 +1,34 @@ +""" +Run with a process monitor from a terminal using: +$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py || kill $! + +""" +from multiprocessing import cpu_count +import os + +import tractor +import trio + + +async def target(): + print(f"Yo, i'm {tractor.current_actor().name} " + f"running in pid {os.getpid()}") + await trio.sleep_forever() + + +async def main(): + + async with tractor.open_nursery() as n: + + for i in range(cpu_count()): + await n.run_in_actor(target, name=f'worker_{i}') + + print('This process tree will self-destruct in 1 sec...') + await trio.sleep(1) + + # you could have done this yourself + raise Exception('Self Destructed') + + +if __name__ == '__main__': + trio.run(main) From a90a2b8787a57dfe192c1d9f97ab2c94a83f4fab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 21:24:43 -0500 Subject: [PATCH 07/12] Contain the error --- examples/parallelism/we_are_processes.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/parallelism/we_are_processes.py b/examples/parallelism/we_are_processes.py index ac4b594..8283b9c 100644 --- a/examples/parallelism/we_are_processes.py +++ b/examples/parallelism/we_are_processes.py @@ -1,6 +1,6 @@ """ Run with a process monitor from a terminal using: -$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py || kill $! +$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $! """ from multiprocessing import cpu_count @@ -11,7 +11,7 @@ import trio async def target(): - print(f"Yo, i'm {tractor.current_actor().name} " + print(f"Yo, i'm '{tractor.current_actor().name}' " f"running in pid {os.getpid()}") await trio.sleep_forever() @@ -31,4 +31,7 @@ async def main(): if __name__ == '__main__': - trio.run(main) + try: + trio.run(main) + except Exception: + print('Zombies Contained') From 07653bc02e16de87937fd4eb531e0695521cdba9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 21:24:56 -0500 Subject: [PATCH 08/12] Run parallel examples --- tests/test_docs_examples.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index ea676a2..3778e0e 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -78,13 +78,13 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): @pytest.mark.parametrize( 'example_script', - [ - f for f in os.listdir(examples_dir()) - if ( - ('__' not in f) and - ('debugging' not in f) - ) - ], + + # walk yields: (dirpath, dirnames, filenames) + [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] + + if '__' not in f + and 'debugging' not in p[0] + ] ) def test_example(run_example_in_subproc, example_script): """Load and run scripts from this repo's ``examples/`` dir as a user @@ -95,7 +95,7 @@ def test_example(run_example_in_subproc, example_script): test directory and invoke the script as a module with ``python -m test_example``. """ - ex_file = os.path.join(examples_dir(), example_script) + ex_file = os.path.join(*example_script) with open(ex_file, 'r') as ex: code = ex.read() From f7e1c526c53a91ad81b5661a96ba9e112829b08f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 13:14:17 -0500 Subject: [PATCH 09/12] Add `aclosing()` around asyn gen loop --- examples/parallelism/concurrent_actors_primes.py | 8 +++++--- examples/{ => parallelism}/concurrent_futures_primes.py | 0 2 files changed, 5 insertions(+), 3 deletions(-) rename examples/{ => parallelism}/concurrent_futures_primes.py (100%) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 1f5fe48..3ff8dab 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -16,6 +16,7 @@ import time import tractor import trio +from async_generator import aclosing PRIMES = [ @@ -103,10 +104,11 @@ async def main(): async with worker_pool() as actor_map: start = time.time() - # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - async for number, prime in actor_map(is_prime, PRIMES): - print(f'{number} is prime: {prime}') + async with aclosing(actor_map(is_prime, PRIMES)) as results: + async for number, prime in results: + + print(f'{number} is prime: {prime}') print(f'processing took {time.time() - start} seconds') diff --git a/examples/concurrent_futures_primes.py b/examples/parallelism/concurrent_futures_primes.py similarity index 100% rename from examples/concurrent_futures_primes.py rename to examples/parallelism/concurrent_futures_primes.py From 5ffd2d2ab3cdfe5dbd9da1115f323bcacbf6a5bb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 13:20:24 -0500 Subject: [PATCH 10/12] Ignore type checks on stdlib overrides --- tractor/_forkserver_override.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_forkserver_override.py b/tractor/_forkserver_override.py index 25134ff..d799bb8 100644 --- a/tractor/_forkserver_override.py +++ b/tractor/_forkserver_override.py @@ -6,6 +6,8 @@ semaphore tracker per ``MainProcess``. .. note:: There is no type hinting in this code base (yet) to remain as a close as possible to upstream. """ +# type: ignore + import os import socket import signal From a93321e48ea7bd9b60c3b057dfbee4cc73c04ebf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Feb 2021 15:41:21 -0500 Subject: [PATCH 11/12] Don't run stdlib example as part of test set --- ...urrent_futures_primes.py => _concurrent_futures_primes.py} | 0 tests/test_docs_examples.py | 4 +++- 2 files changed, 3 insertions(+), 1 deletion(-) rename examples/parallelism/{concurrent_futures_primes.py => _concurrent_futures_primes.py} (100%) diff --git a/examples/parallelism/concurrent_futures_primes.py b/examples/parallelism/_concurrent_futures_primes.py similarity index 100% rename from examples/parallelism/concurrent_futures_primes.py rename to examples/parallelism/_concurrent_futures_primes.py diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 3778e0e..632d85c 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -83,8 +83,10 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] if '__' not in f + and f[0] != '_' and 'debugging' not in p[0] - ] + ], + ids=lambda t: t[1], ) def test_example(run_example_in_subproc, example_script): """Load and run scripts from this repo's ``examples/`` dir as a user From 2b3beac4b412e851648d01a6200a2a4dd4464f38 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Feb 2021 17:47:06 -0500 Subject: [PATCH 12/12] Test putting readme in docs dir --- README.rst => docs/README.rst | 0 setup.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename README.rst => docs/README.rst (100%) diff --git a/README.rst b/docs/README.rst similarity index 100% rename from README.rst rename to docs/README.rst diff --git a/setup.py b/setup.py index e31e359..20e748d 100755 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ # along with this program. If not, see . from setuptools import setup -with open('README.rst', encoding='utf-8') as f: +with open('docs/README.rst', encoding='utf-8') as f: readme = f.read()