forked from goodboy/tractor
1
0
Fork 0

Add timeout to streaming test

end_of_channel_fixes
Tyler Goodlet 2021-12-15 17:18:30 -05:00
parent f2ba961e81
commit 1652716574
1 changed files with 26 additions and 23 deletions

View File

@ -79,33 +79,36 @@ async def stream_from_single_subactor(
seq = range(10) seq = range(10)
async with portal.open_stream_from( with trio.fail_after(5):
stream_func, async with portal.open_stream_from(
sequence=list(seq), # has to be msgpack serializable stream_func,
) as stream: sequence=list(seq), # has to be msgpack serializable
) as stream:
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
iseq = iter(seq) iseq = iter(seq)
ival = next(iseq) ival = next(iseq)
async for val in stream: async for val in stream:
assert val == ival assert val == ival
try:
ival = next(iseq)
except StopIteration:
# should cancel far end task which will be
# caught and no error is raised
await stream.aclose()
await trio.sleep(0.3)
# ensure EOC signalled-state translates
# XXX: not really sure this is correct,
# shouldn't it be a `ClosedResourceError`?
try: try:
ival = next(iseq) await stream.__anext__()
except StopIteration: except StopAsyncIteration:
# should cancel far end task which will be # stop all spawned subactors
# caught and no error is raised await portal.cancel_actor()
await stream.aclose()
await trio.sleep(0.3)
try:
await stream.__anext__()
except StopAsyncIteration:
# stop all spawned subactors
await portal.cancel_actor()
# await nursery.cancel()
@pytest.mark.parametrize( @pytest.mark.parametrize(