diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 7df2e43..874a24f 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -24,7 +24,8 @@ PRIMES = [ 112272535095293, 115280095190773, 115797848077099, - 1099726899285419] + 1099726899285419, +] def is_prime(n): @@ -50,17 +51,10 @@ async def worker_pool(workers=4): """ - async with tractor.open_nursery( - loglevel='ERROR', - - # uncomment to use ``multiprocessing`` fork server backend - # which gives a startup time boost at the expense of nested - # processs scalability - # start_method='forkserver') - ) as tn: + async with tractor.open_nursery() as tn: portals = [] - results = [] + snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) for i in range(workers): @@ -79,35 +73,37 @@ async def worker_pool(workers=4): ) -> List[bool]: # define an async (local) task to collect results from workers - async def collect_portal_result(func, value, portal): - - results.append((value, await portal.run(func, n=value))) + async def send_result(func, value, portal): + await snd_chan.send((value, await portal.run(func, n=value))) async with trio.open_nursery() as n: for value, portal in zip(sequence, itertools.cycle(portals)): - n.start_soon( - collect_portal_result, + send_result, worker_func, value, portal ) - return results + # deliver results as they arrive + for _ in range(len(sequence)): + yield await recv_chan.receive() yield map - # tear down all "workers" + # tear down all "workers" on pool close await tn.cancel() async def main(): + async with worker_pool() as actor_map: start = time.time() # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - for number, prime in await actor_map(is_prime, PRIMES): + async for number, prime in actor_map(is_prime, PRIMES): + print(f'{number} is prime: {prime}') print(f'processing took {time.time() - start} seconds')