forked from goodboy/tractor
1
0
Fork 0

Yield results on demand using a mem chan

eg_worker_poolz
Tyler Goodlet 2021-01-17 13:57:42 -05:00
parent 3c320f467f
commit 9b07e9ad7c
1 changed files with 14 additions and 18 deletions

View File

@ -24,7 +24,8 @@ PRIMES = [
112272535095293, 112272535095293,
115280095190773, 115280095190773,
115797848077099, 115797848077099,
1099726899285419] 1099726899285419,
]
def is_prime(n): def is_prime(n):
@ -50,17 +51,10 @@ async def worker_pool(workers=4):
""" """
async with tractor.open_nursery( async with tractor.open_nursery() as tn:
loglevel='ERROR',
# uncomment to use ``multiprocessing`` fork server backend
# which gives a startup time boost at the expense of nested
# processs scalability
# start_method='forkserver')
) as tn:
portals = [] portals = []
results = [] snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))
for i in range(workers): for i in range(workers):
@ -79,35 +73,37 @@ async def worker_pool(workers=4):
) -> List[bool]: ) -> List[bool]:
# define an async (local) task to collect results from workers # define an async (local) task to collect results from workers
async def collect_portal_result(func, value, portal): async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))
results.append((value, await portal.run(func, n=value)))
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for value, portal in zip(sequence, itertools.cycle(portals)): for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon( n.start_soon(
collect_portal_result, send_result,
worker_func, worker_func,
value, value,
portal portal
) )
return results # deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()
yield map yield map
# tear down all "workers" # tear down all "workers" on pool close
await tn.cancel() await tn.cancel()
async def main(): async def main():
async with worker_pool() as actor_map: async with worker_pool() as actor_map:
start = time.time() start = time.time()
# for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
for number, prime in await actor_map(is_prime, PRIMES): async for number, prime in actor_map(is_prime, PRIMES):
print(f'{number} is prime: {prime}') print(f'{number} is prime: {prime}')
print(f'processing took {time.time() - start} seconds') print(f'processing took {time.time() - start} seconds')