forked from goodboy/tractor
				
			Modernize streaming example script
- add typing, - apply multi-line call style, - use 'cancel' log level, - enable debug mode.remotes/1757153874605917753/main
							parent
							
								
									683288c8db
								
							
						
					
					
						commit
						b22ee84d26
					
				|  | @ -1,6 +1,11 @@ | |||
| import time | ||||
| import trio | ||||
| import tractor | ||||
| from tractor import ( | ||||
|     ActorNursery, | ||||
|     MsgStream, | ||||
|     Portal, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # this is the first 2 actors, streamer_1 and streamer_2 | ||||
|  | @ -12,14 +17,18 @@ async def stream_data(seed): | |||
| 
 | ||||
| # this is the third actor; the aggregator | ||||
| async def aggregate(seed): | ||||
|     """Ensure that the two streams we receive match but only stream | ||||
|     ''' | ||||
|     Ensure that the two streams we receive match but only stream | ||||
|     a single set of values to the parent. | ||||
|     """ | ||||
|     async with tractor.open_nursery() as nursery: | ||||
|         portals = [] | ||||
| 
 | ||||
|     ''' | ||||
|     an: ActorNursery | ||||
|     async with tractor.open_nursery() as an: | ||||
|         portals: list[Portal] = [] | ||||
|         for i in range(1, 3): | ||||
|             # fork point | ||||
|             portal = await nursery.start_actor( | ||||
| 
 | ||||
|             # fork/spawn call | ||||
|             portal = await an.start_actor( | ||||
|                 name=f'streamer_{i}', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
|  | @ -43,7 +52,11 @@ async def aggregate(seed): | |||
|         async with trio.open_nursery() as n: | ||||
| 
 | ||||
|             for portal in portals: | ||||
|                 n.start_soon(push_to_chan, portal, send_chan.clone()) | ||||
|                 n.start_soon( | ||||
|                     push_to_chan, | ||||
|                     portal, | ||||
|                     send_chan.clone(), | ||||
|                 ) | ||||
| 
 | ||||
|             # close this local task's reference to send side | ||||
|             await send_chan.aclose() | ||||
|  | @ -60,7 +73,7 @@ async def aggregate(seed): | |||
| 
 | ||||
|             print("FINISHED ITERATING in aggregator") | ||||
| 
 | ||||
|         await nursery.cancel() | ||||
|         await an.cancel() | ||||
|         print("WAITING on `ActorNursery` to finish") | ||||
|     print("AGGREGATOR COMPLETE!") | ||||
| 
 | ||||
|  | @ -75,18 +88,21 @@ async def main() -> list[int]: | |||
| 
 | ||||
|     ''' | ||||
|     # yes, a nursery which spawns `trio`-"actors" B) | ||||
|     nursery: tractor.ActorNursery | ||||
|     async with tractor.open_nursery() as nursery: | ||||
|     an: ActorNursery | ||||
|     async with tractor.open_nursery( | ||||
|         loglevel='cancel', | ||||
|         debug_mode=True, | ||||
|     ) as an: | ||||
| 
 | ||||
|         seed = int(1e3) | ||||
|         pre_start = time.time() | ||||
| 
 | ||||
|         portal: tractor.Portal = await nursery.start_actor( | ||||
|         portal: Portal = await an.start_actor( | ||||
|             name='aggregator', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         stream: tractor.MsgStream | ||||
|         stream: MsgStream | ||||
|         async with portal.open_stream_from( | ||||
|             aggregate, | ||||
|             seed=seed, | ||||
|  | @ -95,11 +111,12 @@ async def main() -> list[int]: | |||
|             start = time.time() | ||||
|             # the portal call returns exactly what you'd expect | ||||
|             # as if the remote "aggregate" function was called locally | ||||
|             result_stream = [] | ||||
|             result_stream: list[int] = [] | ||||
|             async for value in stream: | ||||
|                 result_stream.append(value) | ||||
| 
 | ||||
|         await portal.cancel_actor() | ||||
|         cancelled: bool = await portal.cancel_actor() | ||||
|         assert cancelled | ||||
| 
 | ||||
|         print(f"STREAM TIME = {time.time() - start}") | ||||
|         print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue