forked from goodboy/tractor
1
0
Fork 0

Add stream func tests

stream_functions
Tyler Goodlet 2019-03-29 19:10:56 -04:00
parent f885b02c73
commit b965d20cba
1 changed files with 45 additions and 10 deletions

View File

@ -2,13 +2,29 @@
Streaming via async gen api Streaming via async gen api
""" """
import time import time
from functools import partial
import trio import trio
import tractor import tractor
import pytest import pytest
async def stream_seq(sequence):
def test_must_define_ctx():
with pytest.raises(TypeError) as err:
@tractor.stream
async def no_ctx():
pass
assert "no_ctx must be `ctx: tractor.Context" in str(err.value)
@tractor.stream
async def no_ctx(ctx):
pass
async def async_gen_stream(sequence):
for i in sequence: for i in sequence:
yield i yield i
await trio.sleep(0.1) await trio.sleep(0.1)
@ -20,10 +36,23 @@ async def stream_seq(sequence):
assert cs.cancelled_caught assert cs.cancelled_caught
async def stream_from_single_subactor(): @tractor.stream
async def context_stream(ctx, sequence):
for i in sequence:
await ctx.send_yield(i)
await trio.sleep(0.1)
# block indefinitely waiting to be cancelled by ``aclose()`` call
with trio.CancelScope() as cs:
await trio.sleep(float('inf'))
assert 0
assert cs.cancelled_caught
async def stream_from_single_subactor(stream_func_name):
"""Verify we can spawn a daemon actor and retrieve streamed data. """Verify we can spawn a daemon actor and retrieve streamed data.
""" """
async with tractor.find_actor('brokerd') as portals: async with tractor.find_actor('streamerd') as portals:
if not portals: if not portals:
# only one per host address, spawns an actor if None # only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
@ -36,37 +65,43 @@ async def stream_from_single_subactor():
seq = range(10) seq = range(10)
agen = await portal.run( stream = await portal.run(
__name__, __name__,
'stream_seq', # the func above stream_func_name, # one of the funcs above
sequence=list(seq), # has to be msgpack serializable sequence=list(seq), # has to be msgpack serializable
) )
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
iseq = iter(seq) iseq = iter(seq)
ival = next(iseq) ival = next(iseq)
async for val in agen: async for val in stream:
assert val == ival assert val == ival
try: try:
ival = next(iseq) ival = next(iseq)
except StopIteration: except StopIteration:
# should cancel far end task which will be # should cancel far end task which will be
# caught and no error is raised # caught and no error is raised
await agen.aclose() await stream.aclose()
await trio.sleep(0.3) await trio.sleep(0.3)
try: try:
await agen.__anext__() await stream.__anext__()
except StopAsyncIteration: except StopAsyncIteration:
# stop all spawned subactors # stop all spawned subactors
await portal.cancel_actor() await portal.cancel_actor()
# await nursery.cancel() # await nursery.cancel()
def test_stream_from_single_subactor(arb_addr, start_method): @pytest.mark.parametrize(
'stream_func', ['async_gen_stream', 'context_stream']
)
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator. """Verify streaming from a spawned async generator.
""" """
tractor.run( tractor.run(
stream_from_single_subactor, partial(
stream_from_single_subactor,
stream_func_name=stream_func,
),
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
start_method=start_method, start_method=start_method,
) )