Update streaming example in docs
							parent
							
								
									9a780485dc
								
							
						
					
					
						commit
						0b520c7bee
					
				
							
								
								
									
										36
									
								
								README.rst
								
								
								
								
							
							
						
						
									
										36
									
								
								README.rst
								
								
								
								
							|  | @ -391,30 +391,32 @@ and print the results to your screen: | |||
| 
 | ||||
|            send_chan, recv_chan = trio.open_memory_channel(500) | ||||
| 
 | ||||
|            async def push_to_q(portal): | ||||
|                async for value in await portal.run( | ||||
|                    __name__, 'stream_data', seed=seed | ||||
|                ): | ||||
|                    # leverage trio's built-in backpressure | ||||
|                    await send_chan.send(value) | ||||
|            async def push_to_chan(portal, send_chan): | ||||
|                async with send_chan: | ||||
|                    async for value in await portal.run( | ||||
|                        __name__, 'stream_data', seed=seed | ||||
|                    ): | ||||
|                        # leverage trio's built-in backpressure | ||||
|                        await send_chan.send(value) | ||||
| 
 | ||||
|                await send_chan.send(None) | ||||
|                print(f"FINISHED ITERATING {portal.channel.uid}") | ||||
| 
 | ||||
|            # spawn 2 trio tasks to collect streams and push to a local queue | ||||
|            async with trio.open_nursery() as n: | ||||
| 
 | ||||
|                for portal in portals: | ||||
|                    n.start_soon(push_to_q, portal) | ||||
|                    n.start_soon(push_to_chan, portal, send_chan.clone()) | ||||
| 
 | ||||
|                # close this local task's reference to send side | ||||
|                await send_chan.aclose() | ||||
| 
 | ||||
|                unique_vals = set() | ||||
|                async for value in recv_chan: | ||||
|                    if value not in unique_vals: | ||||
|                        unique_vals.add(value) | ||||
|                        # yield upwards to the spawning parent actor | ||||
|                        yield value | ||||
| 
 | ||||
|                        if value is None: | ||||
|                            break | ||||
|                async with recv_chan: | ||||
|                    async for value in recv_chan: | ||||
|                        if value not in unique_vals: | ||||
|                            unique_vals.add(value) | ||||
|                            # yield upwards to the spawning parent actor | ||||
|                            yield value | ||||
| 
 | ||||
|                    assert value in unique_vals | ||||
| 
 | ||||
|  | @ -449,7 +451,7 @@ and print the results to your screen: | |||
| 
 | ||||
|            print(f"STREAM TIME = {time.time() - start}") | ||||
|            print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") | ||||
|            assert result_stream == list(range(seed)) + [None] | ||||
|            assert result_stream == list(range(seed)) | ||||
|            return result_stream | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue