diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 4429d25..74c06ca 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -218,3 +218,54 @@ def test_reqresp_ontopof_streaming(): trio.run(main) except trio.TooSlowError: pass + + +async def async_gen_stream(sequence): + for i in sequence: + yield i + await trio.sleep(0.1) + + +@tractor.context +async def echo_ctx_stream( + ctx: tractor.Context, +) -> None: + await ctx.started() + + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) + + +def test_sigint_both_stream_types(): + '''Verify that running a bi-directional and recv only stream + side-by-side will cancel correctly from SIGINT. + + ''' + async def main(): + with trio.fail_after(2): + async with tractor.open_nursery() as n: + # name of this actor will be same as target func + portal = await n.start_actor( + '2_way', + enable_modules=[__name__] + ) + + async with portal.open_context(echo_ctx_stream) as (ctx, _): + async with ctx.open_stream() as stream: + async with portal.open_stream_from( + async_gen_stream, + sequence=list(range(1)), + ) as gen_stream: + + msg = await gen_stream.receive() + await stream.send(msg) + resp = await stream.receive() + assert resp == msg + raise KeyboardInterrupt + + try: + trio.run(main) + assert 0, "Didn't receive KBI!?" + except KeyboardInterrupt: + pass