Add a multi-task streaming test
							parent
							
								
									b6dd58b1cf
								
							
						
					
					
						commit
						1c7c9da99c
					
				|  | @ -140,3 +140,81 @@ def test_dynamic_pub_sub(): | ||||||
|         trio.run(main) |         trio.run(main) | ||||||
|     except trio.TooSlowError: |     except trio.TooSlowError: | ||||||
|         pass |         pass | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def one_task_streams_and_one_handles_reqresp( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     await ctx.started() | ||||||
|  | 
 | ||||||
|  |     async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |         async def pingpong(): | ||||||
|  |             '''Run a simple req/response service. | ||||||
|  | 
 | ||||||
|  |             ''' | ||||||
|  |             async for msg in stream: | ||||||
|  |                 print('rpc server ping') | ||||||
|  |                 assert msg == 'ping' | ||||||
|  |                 print('rpc server pong') | ||||||
|  |                 await stream.send('pong') | ||||||
|  | 
 | ||||||
|  |         async with trio.open_nursery() as n: | ||||||
|  |             n.start_soon(pingpong) | ||||||
|  | 
 | ||||||
|  |             for _ in itertools.count(): | ||||||
|  |                 await stream.send('yo') | ||||||
|  |                 await trio.sleep(0.01) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_reqresp_ontopof_streaming(): | ||||||
|  |     '''Test a subactor that both streams with one task and | ||||||
|  |     spawns another which handles a small requests-response | ||||||
|  |     dialogue over the same bidir-stream. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     async def main(): | ||||||
|  | 
 | ||||||
|  |         with trio.move_on_after(2): | ||||||
|  |             async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |                 # name of this actor will be same as target func | ||||||
|  |                 portal = await n.start_actor( | ||||||
|  |                     'dual_tasks', | ||||||
|  |                     enable_modules=[__name__] | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |                 # flat to make sure we get at least one pong | ||||||
|  |                 got_pong: bool = False | ||||||
|  | 
 | ||||||
|  |                 async with portal.open_context( | ||||||
|  |                     one_task_streams_and_one_handles_reqresp, | ||||||
|  | 
 | ||||||
|  |                 ) as (ctx, first): | ||||||
|  | 
 | ||||||
|  |                     assert first is None | ||||||
|  | 
 | ||||||
|  |                     async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |                         await stream.send('ping') | ||||||
|  | 
 | ||||||
|  |                         async for msg in stream: | ||||||
|  |                             print(f'client received: {msg}') | ||||||
|  | 
 | ||||||
|  |                             assert msg in {'pong', 'yo'} | ||||||
|  | 
 | ||||||
|  |                             if msg == 'pong': | ||||||
|  |                                 got_pong = True | ||||||
|  |                                 await stream.send('ping') | ||||||
|  |                                 print('client sent ping') | ||||||
|  | 
 | ||||||
|  |         assert got_pong | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         trio.run(main) | ||||||
|  |     except trio.TooSlowError: | ||||||
|  |         pass | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue