forked from goodboy/tractor
1
0
Fork 0

Yield results on demand using a mem chan

actor_state_via_messages
Tyler Goodlet 2021-01-17 13:57:42 -05:00
parent f715a0cae8
commit a10c4b172a
1 changed files with 14 additions and 18 deletions

View File

@ -24,7 +24,8 @@ PRIMES = [
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
1099726899285419,
]
def is_prime(n):
@ -50,17 +51,10 @@ async def worker_pool(workers=4):
"""
async with tractor.open_nursery(
loglevel='ERROR',
# uncomment to use ``multiprocessing`` fork server backend
# which gives a startup time boost at the expense of nested
# processs scalability
# start_method='forkserver')
) as tn:
async with tractor.open_nursery() as tn:
portals = []
results = []
snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))
for i in range(workers):
@ -79,35 +73,37 @@ async def worker_pool(workers=4):
) -> List[bool]:
# define an async (local) task to collect results from workers
async def collect_portal_result(func, value, portal):
results.append((value, await portal.run(func, n=value)))
async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))
async with trio.open_nursery() as n:
for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon(
collect_portal_result,
send_result,
worker_func,
value,
portal
)
return results
# deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()
yield map
# tear down all "workers"
# tear down all "workers" on pool close
await tn.cancel()
async def main():
async with worker_pool() as actor_map:
start = time.time()
# for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
for number, prime in await actor_map(is_prime, PRIMES):
async for number, prime in actor_map(is_prime, PRIMES):
print(f'{number} is prime: {prime}')
print(f'processing took {time.time() - start} seconds')