forked from goodboy/tractor
				
			Add first set of interloop streaming tests
							parent
							
								
									44d0e9fc32
								
							
						
					
					
						commit
						ad2567dd73
					
				| 
						 | 
				
			
			@ -134,8 +134,6 @@ def test_trio_cancels_aio(arb_addr):
 | 
			
		|||
            # cancel the nursery shortly after boot
 | 
			
		||||
 | 
			
		||||
            async with tractor.open_nursery() as n:
 | 
			
		||||
                # debug_mode=True
 | 
			
		||||
            # ) as n:
 | 
			
		||||
                portal = await n.run_in_actor(
 | 
			
		||||
                    asyncio_actor,
 | 
			
		||||
                    target='sleep_forever',
 | 
			
		||||
| 
						 | 
				
			
			@ -177,7 +175,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
 | 
			
		|||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO:
 | 
			
		||||
# TODO: verify open_channel_from will fail on this..
 | 
			
		||||
async def no_to_trio_in_args():
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -186,33 +184,77 @@ async def push_from_aio_task(
 | 
			
		|||
 | 
			
		||||
    sequence: Iterable,
 | 
			
		||||
    to_trio: trio.abc.SendChannel,
 | 
			
		||||
    expect_cancel: False,
 | 
			
		||||
    fail_early: bool,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    for i in range(100):
 | 
			
		||||
        print(f'asyncio sending {i}')
 | 
			
		||||
        to_trio.send_nowait(i)
 | 
			
		||||
        await asyncio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
    print(f'asyncio streamer complete!')
 | 
			
		||||
    try:
 | 
			
		||||
        # sync caller ctx manager
 | 
			
		||||
        to_trio.send_nowait(True)
 | 
			
		||||
 | 
			
		||||
        for i in sequence:
 | 
			
		||||
            print(f'asyncio sending {i}')
 | 
			
		||||
            to_trio.send_nowait(i)
 | 
			
		||||
            await asyncio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
            if i == 50 and fail_early:
 | 
			
		||||
                raise Exception
 | 
			
		||||
 | 
			
		||||
        print(f'asyncio streamer complete!')
 | 
			
		||||
 | 
			
		||||
    except asyncio.CancelledError:
 | 
			
		||||
        if not expect_cancel:
 | 
			
		||||
            pytest.fail("aio task was cancelled unexpectedly")
 | 
			
		||||
        raise
 | 
			
		||||
    else:
 | 
			
		||||
        if expect_cancel:
 | 
			
		||||
            pytest.fail("aio task wasn't cancelled as expected!?")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_from_aio():
 | 
			
		||||
async def stream_from_aio(
 | 
			
		||||
 | 
			
		||||
    exit_early: bool = False,
 | 
			
		||||
    raise_err: bool = False,
 | 
			
		||||
    aio_raise_err: bool = False,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    seq = range(100)
 | 
			
		||||
    expect = list(seq)
 | 
			
		||||
 | 
			
		||||
    async with to_asyncio.open_channel_from(
 | 
			
		||||
        push_from_aio_task,
 | 
			
		||||
        sequence=seq,
 | 
			
		||||
    ) as (first, chan):
 | 
			
		||||
    try:
 | 
			
		||||
        pulled = []
 | 
			
		||||
 | 
			
		||||
        pulled = [first]
 | 
			
		||||
        async for value in chan:
 | 
			
		||||
            print(f'trio received {value}')
 | 
			
		||||
            pulled.append(value)
 | 
			
		||||
        async with to_asyncio.open_channel_from(
 | 
			
		||||
            push_from_aio_task,
 | 
			
		||||
            sequence=seq,
 | 
			
		||||
            expect_cancel=raise_err or exit_early,
 | 
			
		||||
            fail_early=aio_raise_err,
 | 
			
		||||
        ) as (first, chan):
 | 
			
		||||
 | 
			
		||||
        assert pulled == expect
 | 
			
		||||
            assert first is True
 | 
			
		||||
 | 
			
		||||
    print('trio guest mode task completed!')
 | 
			
		||||
            async for value in chan:
 | 
			
		||||
                print(f'trio received {value}')
 | 
			
		||||
                pulled.append(value)
 | 
			
		||||
 | 
			
		||||
                if value == 50:
 | 
			
		||||
                    if raise_err:
 | 
			
		||||
                        raise Exception
 | 
			
		||||
                    elif exit_early:
 | 
			
		||||
                        break
 | 
			
		||||
    finally:
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            not raise_err and
 | 
			
		||||
            not exit_early and
 | 
			
		||||
            not aio_raise_err
 | 
			
		||||
        ):
 | 
			
		||||
            assert pulled == expect
 | 
			
		||||
        else:
 | 
			
		||||
            assert pulled == expect[:51]
 | 
			
		||||
 | 
			
		||||
        print('trio guest mode task completed!')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_basic_interloop_channel_stream(arb_addr):
 | 
			
		||||
| 
						 | 
				
			
			@ -227,14 +269,57 @@ def test_basic_interloop_channel_stream(arb_addr):
 | 
			
		|||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: parametrize the above test and avoid the duplication here?
 | 
			
		||||
def test_trio_error_cancels_intertask_chan(arb_addr):
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery() as n:
 | 
			
		||||
            portal = await n.run_in_actor(
 | 
			
		||||
                stream_from_aio,
 | 
			
		||||
                raise_err=True,
 | 
			
		||||
                infect_asyncio=True,
 | 
			
		||||
            )
 | 
			
		||||
            # should trigger remote actor error
 | 
			
		||||
            await portal.result()
 | 
			
		||||
 | 
			
		||||
# def test_trio_error_cancels_intertask_chan(arb_addr):
 | 
			
		||||
#     ...
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# def test_trio_cancels_and_channel_exits(arb_addr):
 | 
			
		||||
#     ...
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# def test_aio_errors_and_channel_propagates(arb_addr):
 | 
			
		||||
    with pytest.raises(RemoteActorError) as excinfo:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
    # ensure boxed error is correct
 | 
			
		||||
    assert excinfo.value.type == Exception
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_trio_closes_early_and_channel_exits(arb_addr):
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery() as n:
 | 
			
		||||
            portal = await n.run_in_actor(
 | 
			
		||||
                stream_from_aio,
 | 
			
		||||
                exit_early=True,
 | 
			
		||||
                infect_asyncio=True,
 | 
			
		||||
            )
 | 
			
		||||
            # should trigger remote actor error
 | 
			
		||||
            await portal.result()
 | 
			
		||||
 | 
			
		||||
    # should be a quiet exit on a simple channel exit
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery() as n:
 | 
			
		||||
            portal = await n.run_in_actor(
 | 
			
		||||
                stream_from_aio,
 | 
			
		||||
                aio_raise_err=True,
 | 
			
		||||
                infect_asyncio=True,
 | 
			
		||||
            )
 | 
			
		||||
            # should trigger remote actor error
 | 
			
		||||
            await portal.result()
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(RemoteActorError) as excinfo:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
    # ensure boxed error is correct
 | 
			
		||||
    assert excinfo.value.type == Exception
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# def test_2way_reqresp(arb_addr):
 | 
			
		||||
#     ...
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue