forked from goodboy/tractor
1
0
Fork 0

Add tests which verify the readme is correct

- steal from `trio` and add a `tractor_test` decorator
- use a random arbiter port to avoid conflicts with locally running
  systems
- add all the (obviously) hilarious readme tests
- add a complex cancellation test which works with
  `trio.move_on_after()`
asyncgen_closing_fix
Tyler Goodlet 2018-07-10 17:19:54 -04:00
parent 49573c9a03
commit 1854471992
1 changed files with 133 additions and 17 deletions

View File

@ -2,13 +2,34 @@
Actor model API testing
"""
import time
from functools import partial
from functools import partial, wraps
import random
import pytest
import trio
import tractor
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
"""
@wraps(fn)
def wrapper(*args, **kwargs):
__tracebackhide__ = True
return tractor.run(
partial(fn, *args), arbiter_addr=_arb_addr, **kwargs)
return wrapper
@pytest.mark.trio
async def test_no_arbitter():
"""An arbitter must be established before any nurseries
@ -36,7 +57,7 @@ def test_local_actor_async_func():
await trio.sleep(0.1)
start = time.time()
tractor.run(print_loop)
tractor.run(print_loop, arbiter_addr=_arb_addr)
# ensure the sleeps were actually awaited
assert time.time() - start >= 1
@ -82,6 +103,7 @@ def test_local_arbiter_subactor_global_state():
True,
name='arbiter',
statespace=statespace,
arbiter_addr=_arb_addr,
)
assert result == 10
@ -135,7 +157,7 @@ async def stream_from_single_subactor():
def test_stream_from_single_subactor():
"""Verify streaming from a spawned async generator.
"""
tractor.run(stream_from_single_subactor)
tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr)
async def assert_err():
@ -162,7 +184,92 @@ def test_remote_error():
with pytest.raises(tractor.RemoteActorError):
# also raises
tractor.run(main)
tractor.run(main, arbiter_addr=_arb_addr)
the_line = 'Hi my name is {}'
async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
await trio.sleep(0.4) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
return await portal.run(__name__, 'hi')
@tractor_test
async def test_trynamic_trio():
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.start_actor(
'donny',
main=partial(say_hello, 'gretchen'),
rpc_module_paths=[__name__],
outlive_main=True
)
gretchen = await n.start_actor(
'gretchen',
main=partial(say_hello, 'donny'),
rpc_module_paths=[__name__],
# outlive_main=True
)
print(await gretchen.result())
print(await donny.result())
await donny.cancel_actor()
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")
def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent
(errr, I mean different) process.
"""
return 'have you ever seen a portal?'
@tractor_test
async def test_movie_theatre_convo():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'frank',
# enable the actor to run funcs from this current module
rpc_module_paths=[__name__],
outlive_main=True,
)
print(await portal.run(__name__, 'movie_theatre_question'))
# calls the subactor a 2nd time
print(await portal.run(__name__, 'movie_theatre_question'))
# the async with will block here indefinitely waiting
# for our actor "frank" to complete, but since it's an
# "outlive_main" actor it will never until cancelled
await portal.cancel_actor()
def cellar_door():
return "Dang that's beautiful"
@tractor_test
async def test_most_beautiful_word():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.start_actor('some_linguist', main=cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
def do_nothing():
@ -178,18 +285,17 @@ def test_cancel_single_subactor():
portal = await nursery.start_actor(
'nothin', rpc_module_paths=[__name__],
)
assert not await portal.run(__name__, 'do_nothing')
assert (await portal.run(__name__, 'do_nothing')) is None
# would hang otherwise
await nursery.cancel()
tractor.run(main)
tractor.run(main, arbiter_addr=_arb_addr)
async def stream_data(seed):
for i in range(seed):
yield i
# await trio.sleep(1/10000.) # trigger scheduler
await trio.sleep(0) # trigger scheduler
@ -209,7 +315,7 @@ async def aggregate(seed):
portals.append(portal)
q = trio.Queue(int(1e3))
q = trio.Queue(500)
async def push_to_q(portal):
async for value in await portal.run(
@ -231,12 +337,12 @@ async def aggregate(seed):
unique_vals.add(value)
# yield upwards to the spawning parent actor
yield value
continue
assert value in unique_vals
if value is None:
break
assert value in unique_vals
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
@ -244,13 +350,15 @@ async def aggregate(seed):
print("AGGREGATOR COMPLETE!")
async def main():
# @tractor_test
async def a_quadruple_example():
# a nursery which spawns "actors"
async with tractor.open_nursery() as nursery:
seed = int(10)
import time
pre_start = time.time()
portal = await nursery.start_actor(
name='aggregator',
# executed in the actor's "main task" immediately
@ -269,13 +377,21 @@ async def main():
assert result_stream == list(range(seed)) + [None]
def test_show_me_the_code():
async def cancel_after(wait):
with trio.move_on_after(wait):
await a_quadruple_example()
def test_a_quadruple_example():
"""Verify the *show me the code* readme example works.
"""
tractor.run(main, arbiter_addr=('127.0.0.1', 1616))
tractor.run(cancel_after, 2, arbiter_addr=_arb_addr)
def test_cancel_smtc():
"""Verify we can cancel midway through the smtc example gracefully.
def test_not_fast_enough_quad():
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
This also serves as a kind of "we'd like to eventually be this fast test".
"""
pass
tractor.run(cancel_after, 1, arbiter_addr=_arb_addr)