Update all tests to new streaming API

stream_contexts
Tyler Goodlet 2021-04-28 11:55:37 -04:00
parent 5a5e6baad1
commit 2498a4963b
4 changed files with 178 additions and 152 deletions

View File

@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err):
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portal = await nursery.run_in_actor(assert_err, name='errorer', **args) portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)
# get result(s) from main task # get result(s) from main task
try: try:
@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'donny', 'donny',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
# this async for loop streams values from the above # this async for loop streams values from the above
# async generator running in a separate process # async generator running in a separate process
async for letter in await portal.run(stream_forever): async with portal.open_stream_from(stream_forever) as stream:
print(letter) async for letter in stream:
print(letter)
# we support trio's cancellation system # we support trio's cancellation system
assert cancel_scope.cancelled_caught assert cancel_scope.cancelled_caught
@ -430,7 +433,6 @@ def test_cancel_via_SIGINT_other_task(
tractor.run(main) tractor.run(main)
async def spin_for(period=3): async def spin_for(period=3):
"Sync sleep." "Sync sleep."
time.sleep(period) time.sleep(period)
@ -438,7 +440,7 @@ async def spin_for(period=3):
async def spawn(): async def spawn():
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor( await tn.run_in_actor(
spin_for, spin_for,
name='sleeper', name='sleeper',
) )
@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor( await tn.run_in_actor(
spawn, spawn,
name='spawn', name='spawn',
) )

View File

@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0):
async def stream_from(portal): async def stream_from(portal):
async for value in await portal.result(): async with portal.open_stream_from(stream_forever) as stream:
print(value) async for value in stream:
print(value)
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -139,18 +140,20 @@ async def spawn_and_check_registry(
registry = await get_reg() registry = await get_reg()
assert actor.uid in registry assert actor.uid in registry
if with_streaming: try:
to_run = stream_forever async with tractor.open_nursery() as n:
else: async with trio.open_nursery() as trion:
to_run = trio.sleep_forever
async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
portals[name] = await n.run_in_actor(to_run, name=name) if with_streaming:
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up # wait on last actor to come up
async with tractor.wait_for_actor(name): async with tractor.wait_for_actor(name):
@ -171,19 +174,19 @@ async def spawn_and_check_registry(
trion.start_soon(cancel, use_signal, 1) trion.start_soon(cancel, use_signal, 1)
last_p = pts[-1] last_p = pts[-1]
async for value in await last_p.result(): await stream_from(last_p)
print(value)
else: else:
await cancel(use_signal) await cancel(use_signal)
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(0.5) await trio.sleep(0.5)
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert len(registry) == extra assert len(registry) == extra
assert actor.uid in registry assert actor.uid in registry
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@ -260,36 +263,36 @@ async def close_chans_before_nursery(
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor( portal1 = await tn.start_actor(
stream_forever, name='consumer1', enable_modules=[__name__])
name='consumer1', portal2 = await tn.start_actor(
) 'consumer2', enable_modules=[__name__])
agen1 = await portal1.result()
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) async with (
agen2 = await portal2.run(stream_forever) portal1.open_stream_from(stream_forever) as agen1,
portal2.open_stream_from(stream_forever) as agen2,
):
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
async with trio.open_nursery() as n: # XXX: THIS IS THE KEY THING that happens
n.start_soon(streamer, agen1) # **before** exiting the actor nursery block
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that happens # also kill off channels cuz why not
# **before** exiting the actor nursery block await agen1.aclose()
await agen2.aclose()
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(.5) await trio.sleep(1)
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()

View File

@ -61,44 +61,47 @@ async def subs(
return isinstance(i, int) return isinstance(i, int)
async with tractor.find_actor(pub_actor_name) as portal: async with tractor.find_actor(pub_actor_name) as portal:
stream = await portal.run( async with (
pubber, portal.open_stream_from(
topics=which, pubber,
seed=seed, topics=which,
) seed=seed,
task_status.started(stream) ) as stream
times = 10 ):
count = 0 task_status.started(stream)
await stream.__anext__() times = 10
async for pkt in stream: count = 0
for topic, value in pkt.items(): await stream.__anext__()
assert pred(value)
count += 1
if count >= times:
break
await stream.aclose()
stream = await portal.run(
pubber,
topics=['odd'],
seed=seed,
)
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream: async for pkt in stream:
for topic, value in pkt.items(): for topic, value in pkt.items():
pass assert pred(value)
# assert pred(value)
count += 1 count += 1
if count >= times: if count >= times:
break break
finally:
await stream.aclose() await stream.aclose()
async with (
portal.open_stream_from(
pubber,
topics=['odd'],
seed=seed,
) as stream
):
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream:
for topic, value in pkt.items():
pass
# assert pred(value)
count += 1
if count >= times:
break
finally:
await stream.aclose()
@tractor.msg.pub(tasks=['one', 'two']) @tractor.msg.pub(tasks=['one', 'two'])
async def multilock_pubber(get_topics): async def multilock_pubber(get_topics):
@ -128,11 +131,10 @@ async def test_required_args(callwith_expecterror):
await func(**kwargs) await func(**kwargs)
else: else:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# await func(**kwargs)
portal = await n.run_in_actor( portal = await n.start_actor(
multilock_pubber,
name='pubber', name='pubber',
**kwargs enable_modules=[__name__],
) )
async with tractor.wait_for_actor('pubber'): async with tractor.wait_for_actor('pubber'):
@ -140,8 +142,14 @@ async def test_required_args(callwith_expecterror):
await trio.sleep(0.5) await trio.sleep(0.5)
async for val in await portal.result(): async with portal.open_stream_from(
assert val == {'doggy': 10} multilock_pubber,
**kwargs
) as stream:
async for val in stream:
assert val == {'doggy': 10}
await portal.cancel_actor()
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@ -61,37 +61,38 @@ async def stream_from_single_subactor(stream_func):
# no brokerd actor found # no brokerd actor found
portal = await nursery.start_actor( portal = await nursery.start_actor(
'streamerd', 'streamerd',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
seq = range(10) seq = range(10)
stream = await portal.run( async with portal.open_stream_from(
stream_func, # one of the funcs above stream_func,
sequence=list(seq), # has to be msgpack serializable sequence=list(seq), # has to be msgpack serializable
) ) as stream:
# it'd sure be nice to have an asyncitertools here...
iseq = iter(seq)
ival = next(iseq)
async for val in stream: # it'd sure be nice to have an asyncitertools here...
assert val == ival iseq = iter(seq)
ival = next(iseq)
async for val in stream:
assert val == ival
try:
ival = next(iseq)
except StopIteration:
# should cancel far end task which will be
# caught and no error is raised
await stream.aclose()
await trio.sleep(0.3)
try: try:
ival = next(iseq) await stream.__anext__()
except StopIteration: except StopAsyncIteration:
# should cancel far end task which will be # stop all spawned subactors
# caught and no error is raised await portal.cancel_actor()
await stream.aclose() # await nursery.cancel()
await trio.sleep(0.3)
try:
await stream.__anext__()
except StopAsyncIteration:
# stop all spawned subactors
await portal.cancel_actor()
# await nursery.cancel()
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -132,7 +133,7 @@ async def aggregate(seed):
# fork point # fork point
portal = await nursery.start_actor( portal = await nursery.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
portals.append(portal) portals.append(portal)
@ -141,11 +142,14 @@ async def aggregate(seed):
async def push_to_chan(portal, send_chan): async def push_to_chan(portal, send_chan):
async with send_chan: async with send_chan:
async for value in await portal.run(
__name__, 'stream_data', seed=seed async with portal.open_stream_from(
): stream_data, seed=seed,
# leverage trio's built-in backpressure ) as stream:
await send_chan.send(value)
async for value in stream:
# leverage trio's built-in backpressure
await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}") print(f"FINISHED ITERATING {portal.channel.uid}")
@ -183,22 +187,24 @@ async def a_quadruple_example():
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal = await nursery.run_in_actor( portal = await nursery.start_actor(
aggregate,
seed=seed,
name='aggregator', name='aggregator',
enable_modules=[__name__],
) )
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream = [] result_stream = []
async for value in await portal.result():
result_stream.append(value) async with portal.open_stream_from(aggregate, seed=seed) as stream:
async for value in stream:
result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed)) assert result_stream == list(range(seed))
await portal.cancel_actor()
return result_stream return result_stream
@ -272,48 +278,55 @@ async def test_respawn_consumer_task(
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
stream = await(await n.run_in_actor( portal = await n.start_actor(
name='streamer',
enable_modules=[__name__]
)
async with portal.open_stream_from(
stream_data, stream_data,
seed=11, seed=11,
name='streamer', ) as stream:
)).result()
expect = set(range(11)) expect = set(range(11))
received = [] received = []
# this is the re-spawn task routine # this is the re-spawn task routine
async def consume(task_status=trio.TASK_STATUS_IGNORED): async def consume(task_status=trio.TASK_STATUS_IGNORED):
print('starting consume task..') print('starting consume task..')
nonlocal stream nonlocal stream
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
task_status.started(cs) task_status.started(cs)
# shield stream's underlying channel from cancellation # shield stream's underlying channel from cancellation
with stream.shield(): with stream.shield():
async for v in stream: async for v in stream:
print(f'from stream: {v}') print(f'from stream: {v}')
expect.remove(v) expect.remove(v)
received.append(v) received.append(v)
print('exited consume') print('exited consume')
async with trio.open_nursery() as ln: async with trio.open_nursery() as ln:
cs = await ln.start(consume) cs = await ln.start(consume)
while True: while True:
await trio.sleep(0.1) await trio.sleep(0.1)
if received[-1] % 2 == 0: if received[-1] % 2 == 0:
print('cancelling consume task..') print('cancelling consume task..')
cs.cancel() cs.cancel()
# respawn # respawn
cs = await ln.start(consume) cs = await ln.start(consume)
if not expect: if not expect:
print("all values streamed, BREAKING") print("all values streamed, BREAKING")
break break
# TODO: this is justification for a
# ``ActorNursery.stream_from_actor()`` helper?
await portal.cancel_actor()