forked from goodboy/tractor
				
			Add a multi-task streaming test
							parent
							
								
									8f25f2d2fa
								
							
						
					
					
						commit
						2be69bb9fb
					
				| 
						 | 
				
			
			@ -140,3 +140,81 @@ def test_dynamic_pub_sub():
 | 
			
		|||
        trio.run(main)
 | 
			
		||||
    except trio.TooSlowError:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def one_task_streams_and_one_handles_reqresp(
 | 
			
		||||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
 | 
			
		||||
    async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
        async def pingpong():
 | 
			
		||||
            '''Run a simple req/response service.
 | 
			
		||||
 | 
			
		||||
            '''
 | 
			
		||||
            async for msg in stream:
 | 
			
		||||
                print('rpc server ping')
 | 
			
		||||
                assert msg == 'ping'
 | 
			
		||||
                print('rpc server pong')
 | 
			
		||||
                await stream.send('pong')
 | 
			
		||||
 | 
			
		||||
        async with trio.open_nursery() as n:
 | 
			
		||||
            n.start_soon(pingpong)
 | 
			
		||||
 | 
			
		||||
            for _ in itertools.count():
 | 
			
		||||
                await stream.send('yo')
 | 
			
		||||
                await trio.sleep(0.01)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_reqresp_ontopof_streaming():
 | 
			
		||||
    '''Test a subactor that both streams with one task and
 | 
			
		||||
    spawns another which handles a small requests-response
 | 
			
		||||
    dialogue over the same bidir-stream.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        with trio.move_on_after(2):
 | 
			
		||||
            async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
                # name of this actor will be same as target func
 | 
			
		||||
                portal = await n.start_actor(
 | 
			
		||||
                    'dual_tasks',
 | 
			
		||||
                    enable_modules=[__name__]
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # flat to make sure we get at least one pong
 | 
			
		||||
                got_pong: bool = False
 | 
			
		||||
 | 
			
		||||
                async with portal.open_context(
 | 
			
		||||
                    one_task_streams_and_one_handles_reqresp,
 | 
			
		||||
 | 
			
		||||
                ) as (ctx, first):
 | 
			
		||||
 | 
			
		||||
                    assert first is None
 | 
			
		||||
 | 
			
		||||
                    async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
                        await stream.send('ping')
 | 
			
		||||
 | 
			
		||||
                        async for msg in stream:
 | 
			
		||||
                            print(f'client received: {msg}')
 | 
			
		||||
 | 
			
		||||
                            assert msg in {'pong', 'yo'}
 | 
			
		||||
 | 
			
		||||
                            if msg == 'pong':
 | 
			
		||||
                                got_pong = True
 | 
			
		||||
                                await stream.send('ping')
 | 
			
		||||
                                print('client sent ping')
 | 
			
		||||
 | 
			
		||||
        assert got_pong
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
    except trio.TooSlowError:
 | 
			
		||||
        pass
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue