Add a multi-task streaming test
							parent
							
								
									910df139ad
								
							
						
					
					
						commit
						b3437dacbe
					
				|  | @ -140,3 +140,81 @@ def test_dynamic_pub_sub(): | |||
|         trio.run(main) | ||||
|     except trio.TooSlowError: | ||||
|         pass | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def one_task_streams_and_one_handles_reqresp( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         async def pingpong(): | ||||
|             '''Run a simple req/response service. | ||||
| 
 | ||||
|             ''' | ||||
|             async for msg in stream: | ||||
|                 print('rpc server ping') | ||||
|                 assert msg == 'ping' | ||||
|                 print('rpc server pong') | ||||
|                 await stream.send('pong') | ||||
| 
 | ||||
|         async with trio.open_nursery() as n: | ||||
|             n.start_soon(pingpong) | ||||
| 
 | ||||
|             for _ in itertools.count(): | ||||
|                 await stream.send('yo') | ||||
|                 await trio.sleep(0.01) | ||||
| 
 | ||||
| 
 | ||||
| def test_reqresp_ontopof_streaming(): | ||||
|     '''Test a subactor that both streams with one task and | ||||
|     spawns another which handles a small requests-response | ||||
|     dialogue over the same bidir-stream. | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
| 
 | ||||
|         with trio.move_on_after(2): | ||||
|             async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|                 # name of this actor will be same as target func | ||||
|                 portal = await n.start_actor( | ||||
|                     'dual_tasks', | ||||
|                     enable_modules=[__name__] | ||||
|                 ) | ||||
| 
 | ||||
|                 # flat to make sure we get at least one pong | ||||
|                 got_pong: bool = False | ||||
| 
 | ||||
|                 async with portal.open_context( | ||||
|                     one_task_streams_and_one_handles_reqresp, | ||||
| 
 | ||||
|                 ) as (ctx, first): | ||||
| 
 | ||||
|                     assert first is None | ||||
| 
 | ||||
|                     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                         await stream.send('ping') | ||||
| 
 | ||||
|                         async for msg in stream: | ||||
|                             print(f'client received: {msg}') | ||||
| 
 | ||||
|                             assert msg in {'pong', 'yo'} | ||||
| 
 | ||||
|                             if msg == 'pong': | ||||
|                                 got_pong = True | ||||
|                                 await stream.send('ping') | ||||
|                                 print('client sent ping') | ||||
| 
 | ||||
|         assert got_pong | ||||
| 
 | ||||
|     try: | ||||
|         trio.run(main) | ||||
|     except trio.TooSlowError: | ||||
|         pass | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue