Add dynamic pubsub test using new bidir stream apis
							parent
							
								
									6f62277c82
								
							
						
					
					
						commit
						8b13dc4967
					
				|  | @ -0,0 +1,142 @@ | ||||||
|  | """ | ||||||
|  | Advanced streaming patterns using bidirectional streams and contexts. | ||||||
|  | 
 | ||||||
|  | """ | ||||||
|  | import itertools | ||||||
|  | from typing import Set, Dict, List | ||||||
|  | 
 | ||||||
|  | import trio | ||||||
|  | import tractor | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | _registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { | ||||||
|  |     'even': set(), | ||||||
|  |     'odd': set(), | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def publisher( | ||||||
|  | 
 | ||||||
|  |     seed: int = 0, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     global _registry | ||||||
|  | 
 | ||||||
|  |     def is_even(i): | ||||||
|  |         return i % 2 == 0 | ||||||
|  | 
 | ||||||
|  |     for val in itertools.count(seed): | ||||||
|  | 
 | ||||||
|  |         sub = 'even' if is_even(val) else 'odd' | ||||||
|  | 
 | ||||||
|  |         for sub_stream in _registry[sub]: | ||||||
|  |             await sub_stream.send(val) | ||||||
|  | 
 | ||||||
|  |         # throttle send rate to ~4Hz | ||||||
|  |         # making it readable to a human user | ||||||
|  |         await trio.sleep(1/4) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def subscribe( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     global _registry | ||||||
|  | 
 | ||||||
|  |     # syn caller | ||||||
|  |     await ctx.started(None) | ||||||
|  | 
 | ||||||
|  |     async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |         # update subs list as consumer requests | ||||||
|  |         async for new_subs in stream: | ||||||
|  | 
 | ||||||
|  |             new_subs = set(new_subs) | ||||||
|  |             remove = new_subs - _registry.keys() | ||||||
|  | 
 | ||||||
|  |             print(f'setting sub to {new_subs} for {ctx.chan.uid}') | ||||||
|  | 
 | ||||||
|  |             # remove old subs | ||||||
|  |             for sub in remove: | ||||||
|  |                 _registry[sub].remove(stream) | ||||||
|  | 
 | ||||||
|  |             # add new subs for consumer | ||||||
|  |             for sub in new_subs: | ||||||
|  |                 _registry[sub].add(stream) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def consumer( | ||||||
|  | 
 | ||||||
|  |     subs: List[str], | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     uid = tractor.current_actor().uid | ||||||
|  | 
 | ||||||
|  |     async with tractor.wait_for_actor('publisher') as portal: | ||||||
|  |         async with portal.open_context(subscribe) as (ctx, first): | ||||||
|  |             async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |                 # flip between the provided subs dynamically | ||||||
|  |                 if len(subs) > 1: | ||||||
|  | 
 | ||||||
|  |                     for sub in itertools.cycle(subs): | ||||||
|  |                         print(f'setting dynamic sub to {sub}') | ||||||
|  |                         await stream.send([sub]) | ||||||
|  | 
 | ||||||
|  |                         count = 0 | ||||||
|  |                         async for value in stream: | ||||||
|  |                             print(f'{uid} got: {value}') | ||||||
|  |                             if count > 5: | ||||||
|  |                                 break | ||||||
|  |                             count += 1 | ||||||
|  | 
 | ||||||
|  |                 else:  # static sub | ||||||
|  | 
 | ||||||
|  |                     await stream.send(subs) | ||||||
|  |                     async for value in stream: | ||||||
|  |                         print(f'{uid} got: {value}') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_dynamic_pub_sub(): | ||||||
|  | 
 | ||||||
|  |     global _registry | ||||||
|  | 
 | ||||||
|  |     from multiprocessing import cpu_count | ||||||
|  |     cpus = cpu_count() | ||||||
|  | 
 | ||||||
|  |     async def main(): | ||||||
|  |         async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |             # name of this actor will be same as target func | ||||||
|  |             await n.run_in_actor(publisher) | ||||||
|  | 
 | ||||||
|  |             for i, sub in zip( | ||||||
|  |                 range(cpus - 2), | ||||||
|  |                 itertools.cycle(_registry.keys()) | ||||||
|  |             ): | ||||||
|  |                 await n.run_in_actor( | ||||||
|  |                     consumer, | ||||||
|  |                     name=f'consumer_{sub}', | ||||||
|  |                     subs=[sub], | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |             # make one dynamic subscriber | ||||||
|  |             await n.run_in_actor( | ||||||
|  |                 consumer, | ||||||
|  |                 name='consumer_dynamic', | ||||||
|  |                 subs=list(_registry.keys()), | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             # block until cancelled by user | ||||||
|  |             with trio.fail_after(10): | ||||||
|  |                 await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         trio.run(main) | ||||||
|  |     except trio.TooSlowError: | ||||||
|  |         pass | ||||||
		Loading…
	
		Reference in New Issue