forked from goodboy/tractor
Add a test of both stream styles side-by-side
Not sure we even have a test for this yet. The main issue discovered by a user project (https://github.com/adder46/wrath) was that a kbi raised inside a block like this (with both recv-only and send-recv streams) would not cancel on the first ctrl-c sent from console and instead SIGiNT had to be repeatedly sent as many times as there are subactors in the first level tree. This test catches that as well as just verifies the basic side-by-side functionality.fix_kbi_in_ctx_block
parent
a868196d13
commit
c1727ce05e
|
@ -218,3 +218,54 @@ def test_reqresp_ontopof_streaming():
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def async_gen_stream(sequence):
|
||||||
|
for i in sequence:
|
||||||
|
yield i
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def echo_ctx_stream(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
) -> None:
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
async for msg in stream:
|
||||||
|
await stream.send(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def test_sigint_both_stream_types():
|
||||||
|
'''Verify that running a bi-directional and recv only stream
|
||||||
|
side-by-side will cancel correctly from SIGINT.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(2):
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
# name of this actor will be same as target func
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'2_way',
|
||||||
|
enable_modules=[__name__]
|
||||||
|
)
|
||||||
|
|
||||||
|
async with portal.open_context(echo_ctx_stream) as (ctx, _):
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
async with portal.open_stream_from(
|
||||||
|
async_gen_stream,
|
||||||
|
sequence=list(range(1)),
|
||||||
|
) as gen_stream:
|
||||||
|
|
||||||
|
msg = await gen_stream.receive()
|
||||||
|
await stream.send(msg)
|
||||||
|
resp = await stream.receive()
|
||||||
|
assert resp == msg
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
try:
|
||||||
|
trio.run(main)
|
||||||
|
assert 0, "Didn't receive KBI!?"
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
Loading…
Reference in New Issue