From 9fae34a190bcb04a0bea7428eaf5c71188f93f9e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Dec 2020 12:45:27 -0500 Subject: [PATCH] Add our version of the std lib's "worker pool" This is a draft of the `tractor` way to implement the example from the "processs pool" in the stdlib's `concurrent.futures` docs: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example Our runtime is of course slower to startup but once up we of course get the same performance, this confirms that we need to focus some effort not on warm up and teardown times. The mp forkserver method definitely improves startup delay; rolling our own will likely be a good hot spot to play with. What's really nice is our implementation is done in approx 10th the code ;) Also, do we want offer and interface that yields results as they arrive? Relates to #175 --- examples/concurrent_actors_primes.py | 119 ++++++++++++++++++++++++++ examples/concurrent_futures_primes.py | 40 +++++++++ 2 files changed, 159 insertions(+) create mode 100644 examples/concurrent_actors_primes.py create mode 100644 examples/concurrent_futures_primes.py diff --git a/examples/concurrent_actors_primes.py b/examples/concurrent_actors_primes.py new file mode 100644 index 0000000..2867fe5 --- /dev/null +++ b/examples/concurrent_actors_primes.py @@ -0,0 +1,119 @@ +""" +Demonstration of the prime number detector example from the +``concurrent.futures`` docs: + +https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example + +This uses no extra threads or fancy semaphores besides ``tractor``'s +(TCP) channels. + +""" +from contextlib import asynccontextmanager +from typing import List, Callable +import itertools +import math +import time + +import tractor +import trio + + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + + +@asynccontextmanager +async def worker_pool(workers=4): + """Though it's a trivial special case for ``tractor``, the well + known "worker pool" seems to be the defacto "I want this process + pattern" for most parallelism pilgrims. + + """ + + async with tractor.open_nursery() as tn: + + portals = [] + results = [] + + for i in range(workers): + + # this starts a new sub-actor (process + trio runtime) and + # stores it's "portal" for later use to "submit jobs" (ugh). + portals.append( + await tn.start_actor( + f'worker_{i}', + rpc_module_paths=[__name__], + ) + ) + + async def map( + worker_func: Callable[[int], bool], + sequence: List[int] + ) -> List[bool]: + + # define an async (local) task to collect results from workers + async def collect_portal_result(func, value, portal): + + results.append((value, await portal.run(func, n=value))) + + async with trio.open_nursery() as n: + + for value, portal in zip(sequence, itertools.cycle(portals)): + + n.start_soon( + collect_portal_result, + worker_func, + value, + portal + ) + + return results + + yield map + + # tear down all "workers" + await tn.cancel() + + +async def main(): + async with worker_pool() as actor_map: + + start = time.time() + # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + for number, prime in await actor_map(is_prime, PRIMES): + print(f'{number} is prime: {prime}') + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + start = time.time() + tractor.run( + main, + loglevel='ERROR', + + # uncomment to use ``multiprocessing`` fork server backend + # which gives a startup time boost at the expense of nested + # processs scalability + # start_method='forkserver') + ) + print(f'script took {time.time() - start} seconds') diff --git a/examples/concurrent_futures_primes.py b/examples/concurrent_futures_primes.py new file mode 100644 index 0000000..81ae23d --- /dev/null +++ b/examples/concurrent_futures_primes.py @@ -0,0 +1,40 @@ +import time +import concurrent.futures +import math + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def main(): + with concurrent.futures.ProcessPoolExecutor() as executor: + start = time.time() + + for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + print('%d is prime: %s' % (number, prime)) + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + + start = time.time() + main() + print(f'script took {time.time() - start} seconds')