diff --git a/README.rst b/README.rst index ad0f1e6..56fbc73 100644 --- a/README.rst +++ b/README.rst @@ -343,99 +343,99 @@ and print the results to your screen: .. code:: python - import time - import trio - import tractor + import time + import trio + import tractor - # this is the first 2 actors, streamer_1 and streamer_2 - async def stream_data(seed): - for i in range(seed): - yield i - await trio.sleep(0) # trigger scheduler + # this is the first 2 actors, streamer_1 and streamer_2 + async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler - # this is the third actor; the aggregator - async def aggregate(seed): - """Ensure that the two streams we receive match but only stream - a single set of values to the parent. - """ - async with tractor.open_nursery() as nursery: - portals = [] - for i in range(1, 3): - # fork point - portal = await nursery.start_actor( - name=f'streamer_{i}', - rpc_module_paths=[__name__], - ) + # this is the third actor; the aggregator + async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + ) - portals.append(portal) + portals.append(portal) - q = trio.Queue(500) + send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await q.put(value) + async def push_to_q(portal): + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) - await q.put(None) - print(f"FINISHED ITERATING {portal.channel.uid}") + await send_chan.send(None) + print(f"FINISHED ITERATING {portal.channel.uid}") - # spawn 2 trio tasks to collect streams and push to a local queue - async with trio.open_nursery() as n: - for portal in portals: - n.start_soon(push_to_q, portal) + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + for portal in portals: + n.start_soon(push_to_q, portal) - unique_vals = set() - async for value in q: - if value not in unique_vals: - unique_vals.add(value) - # yield upwards to the spawning parent actor - yield value + unique_vals = set() + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value - if value is None: - break + if value is None: + break - assert value in unique_vals + assert value in unique_vals - print("FINISHED ITERATING in aggregator") + print("FINISHED ITERATING in aggregator") - await nursery.cancel() - print("WAITING on `ActorNursery` to finish") - print("AGGREGATOR COMPLETE!") + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") - # this is the main actor and *arbiter* - async def main(): - # a nursery which spawns "actors" - async with tractor.open_nursery() as nursery: + # this is the main actor and *arbiter* + async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: - seed = int(1e3) - import time - pre_start = time.time() + seed = int(1e3) + import time + pre_start = time.time() - portal = await nursery.run_in_actor( - 'aggregator', - aggregate, - seed=seed, - ) + portal = await nursery.run_in_actor( + 'aggregator', + aggregate, + seed=seed, + ) - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) - print(f"STREAM TIME = {time.time() - start}") - print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") - assert result_stream == list(range(seed)) + [None] - return result_stream + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + [None] + return result_stream - final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) Here there's four actors running in separate processes (using all the