Adjust streaming ex to use memory channel
							parent
							
								
									78ddd33e3a
								
							
						
					
					
						commit
						fd4e126e1f
					
				
							
								
								
									
										142
									
								
								README.rst
								
								
								
								
							
							
						
						
									
										142
									
								
								README.rst
								
								
								
								
							| 
						 | 
					@ -343,99 +343,99 @@ and print the results to your screen:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
.. code:: python
 | 
					.. code:: python
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    import time
 | 
					   import time
 | 
				
			||||||
    import trio
 | 
					   import trio
 | 
				
			||||||
    import tractor
 | 
					   import tractor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # this is the first 2 actors, streamer_1 and streamer_2
 | 
					   # this is the first 2 actors, streamer_1 and streamer_2
 | 
				
			||||||
    async def stream_data(seed):
 | 
					   async def stream_data(seed):
 | 
				
			||||||
        for i in range(seed):
 | 
					       for i in range(seed):
 | 
				
			||||||
            yield i
 | 
					           yield i
 | 
				
			||||||
            await trio.sleep(0)  # trigger scheduler
 | 
					           await trio.sleep(0)  # trigger scheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # this is the third actor; the aggregator
 | 
					   # this is the third actor; the aggregator
 | 
				
			||||||
    async def aggregate(seed):
 | 
					   async def aggregate(seed):
 | 
				
			||||||
        """Ensure that the two streams we receive match but only stream
 | 
					       """Ensure that the two streams we receive match but only stream
 | 
				
			||||||
        a single set of values to the parent.
 | 
					       a single set of values to the parent.
 | 
				
			||||||
        """
 | 
					       """
 | 
				
			||||||
        async with tractor.open_nursery() as nursery:
 | 
					       async with tractor.open_nursery() as nursery:
 | 
				
			||||||
            portals = []
 | 
					           portals = []
 | 
				
			||||||
            for i in range(1, 3):
 | 
					           for i in range(1, 3):
 | 
				
			||||||
                # fork point
 | 
					               # fork point
 | 
				
			||||||
                portal = await nursery.start_actor(
 | 
					               portal = await nursery.start_actor(
 | 
				
			||||||
                    name=f'streamer_{i}',
 | 
					                   name=f'streamer_{i}',
 | 
				
			||||||
                    rpc_module_paths=[__name__],
 | 
					                   rpc_module_paths=[__name__],
 | 
				
			||||||
                )
 | 
					               )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                portals.append(portal)
 | 
					               portals.append(portal)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            q = trio.Queue(500)
 | 
					           send_chan, recv_chan = trio.open_memory_channel(500)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async def push_to_q(portal):
 | 
					           async def push_to_q(portal):
 | 
				
			||||||
                async for value in await portal.run(
 | 
					               async for value in await portal.run(
 | 
				
			||||||
                    __name__, 'stream_data', seed=seed
 | 
					                   __name__, 'stream_data', seed=seed
 | 
				
			||||||
                ):
 | 
					               ):
 | 
				
			||||||
                    # leverage trio's built-in backpressure
 | 
					                   # leverage trio's built-in backpressure
 | 
				
			||||||
                    await q.put(value)
 | 
					                   await send_chan.send(value)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                await q.put(None)
 | 
					               await send_chan.send(None)
 | 
				
			||||||
                print(f"FINISHED ITERATING {portal.channel.uid}")
 | 
					               print(f"FINISHED ITERATING {portal.channel.uid}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # spawn 2 trio tasks to collect streams and push to a local queue
 | 
					           # spawn 2 trio tasks to collect streams and push to a local queue
 | 
				
			||||||
            async with trio.open_nursery() as n:
 | 
					           async with trio.open_nursery() as n:
 | 
				
			||||||
                for portal in portals:
 | 
					               for portal in portals:
 | 
				
			||||||
                    n.start_soon(push_to_q, portal)
 | 
					                   n.start_soon(push_to_q, portal)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                unique_vals = set()
 | 
					               unique_vals = set()
 | 
				
			||||||
                async for value in q:
 | 
					               async for value in recv_chan:
 | 
				
			||||||
                    if value not in unique_vals:
 | 
					                   if value not in unique_vals:
 | 
				
			||||||
                        unique_vals.add(value)
 | 
					                       unique_vals.add(value)
 | 
				
			||||||
                        # yield upwards to the spawning parent actor
 | 
					                       # yield upwards to the spawning parent actor
 | 
				
			||||||
                        yield value
 | 
					                       yield value
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        if value is None:
 | 
					                       if value is None:
 | 
				
			||||||
                            break
 | 
					                           break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    assert value in unique_vals
 | 
					                   assert value in unique_vals
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                print("FINISHED ITERATING in aggregator")
 | 
					               print("FINISHED ITERATING in aggregator")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            await nursery.cancel()
 | 
					           await nursery.cancel()
 | 
				
			||||||
            print("WAITING on `ActorNursery` to finish")
 | 
					           print("WAITING on `ActorNursery` to finish")
 | 
				
			||||||
        print("AGGREGATOR COMPLETE!")
 | 
					       print("AGGREGATOR COMPLETE!")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # this is the main actor and *arbiter*
 | 
					   # this is the main actor and *arbiter*
 | 
				
			||||||
    async def main():
 | 
					   async def main():
 | 
				
			||||||
        # a nursery which spawns "actors"
 | 
					       # a nursery which spawns "actors"
 | 
				
			||||||
        async with tractor.open_nursery() as nursery:
 | 
					       async with tractor.open_nursery() as nursery:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            seed = int(1e3)
 | 
					           seed = int(1e3)
 | 
				
			||||||
            import time
 | 
					           import time
 | 
				
			||||||
            pre_start = time.time()
 | 
					           pre_start = time.time()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            portal = await nursery.run_in_actor(
 | 
					           portal = await nursery.run_in_actor(
 | 
				
			||||||
                'aggregator',
 | 
					               'aggregator',
 | 
				
			||||||
                aggregate,
 | 
					               aggregate,
 | 
				
			||||||
                seed=seed,
 | 
					               seed=seed,
 | 
				
			||||||
            )
 | 
					           )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            start = time.time()
 | 
					           start = time.time()
 | 
				
			||||||
            # the portal call returns exactly what you'd expect
 | 
					           # the portal call returns exactly what you'd expect
 | 
				
			||||||
            # as if the remote "aggregate" function was called locally
 | 
					           # as if the remote "aggregate" function was called locally
 | 
				
			||||||
            result_stream = []
 | 
					           result_stream = []
 | 
				
			||||||
            async for value in await portal.result():
 | 
					           async for value in await portal.result():
 | 
				
			||||||
                result_stream.append(value)
 | 
					               result_stream.append(value)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            print(f"STREAM TIME = {time.time() - start}")
 | 
					           print(f"STREAM TIME = {time.time() - start}")
 | 
				
			||||||
            print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
 | 
					           print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
 | 
				
			||||||
            assert result_stream == list(range(seed)) + [None]
 | 
					           assert result_stream == list(range(seed)) + [None]
 | 
				
			||||||
            return result_stream
 | 
					           return result_stream
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616))
 | 
					   final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Here there's four actors running in separate processes (using all the
 | 
					Here there's four actors running in separate processes (using all the
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue