""" Demonstration of the prime number detector example from the ``concurrent.futures`` docs: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example This uses no extra threads, fancy semaphores or futures; all we need is ``tractor``'s channels. """ from contextlib import asynccontextmanager from typing import List, Callable import itertools import math import time import tractor import trio from async_generator import aclosing PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419, ] async def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True @asynccontextmanager async def worker_pool(workers=4): """Though it's a trivial special case for ``tractor``, the well known "worker pool" seems to be the defacto "but, I want this process pattern!" for most parallelism pilgrims. Yes, the workers stay alive (and ready for work) until you close the context. """ async with tractor.open_nursery() as tn: portals = [] snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) for i in range(workers): # this starts a new sub-actor (process + trio runtime) and # stores it's "portal" for later use to "submit jobs" (ugh). portals.append( await tn.start_actor( f'worker_{i}', enable_modules=[__name__], ) ) async def _map( worker_func: Callable[[int], bool], sequence: List[int] ) -> List[bool]: # define an async (local) task to collect results from workers async def send_result(func, value, portal): await snd_chan.send((value, await portal.run(func, n=value))) async with trio.open_nursery() as n: for value, portal in zip(sequence, itertools.cycle(portals)): n.start_soon( send_result, worker_func, value, portal ) # deliver results as they arrive for _ in range(len(sequence)): yield await recv_chan.receive() # deliver the parallel "worker mapper" to user code yield _map # tear down all "workers" on pool close await tn.cancel() async def main(): async with worker_pool() as actor_map: start = time.time() async with aclosing(actor_map(is_prime, PRIMES)) as results: async for number, prime in results: print(f'{number} is prime: {prime}') print(f'processing took {time.time() - start} seconds') if __name__ == '__main__': start = time.time() trio.run(main) print(f'script took {time.time() - start} seconds')