forked from goodboy/tractor
1
0
Fork 0

Add tests for all docs examples

Parametrize our docs example test to include all (now fixed) examples
from the `READ.rst`. The examples themselves have been fixed/corrected
to run but they haven't yet been updated in the actual docs. Once 
lands these example scripts will be directly included in our
documentation so there will be no possibility of presenting incorrect
examples to our users! This technically fixes  even though the new
example aren't going to be included directly in our docs until 
lands.
example_tests
Tyler Goodlet 2020-02-09 02:01:39 -05:00
parent 30f8dd8be4
commit 7880934505
5 changed files with 184 additions and 8 deletions

View File

@ -0,0 +1,22 @@
import tractor
def cellar_door():
return "Dang that's beautiful"
async def main():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('some_linguist', cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
if __name__ == '__main__':
tractor.run(main)

View File

@ -0,0 +1,36 @@
from itertools import repeat
import trio
import tractor
tractor.log.get_console_log("INFO")
async def stream_forever():
for i in repeat("I can see these little future bubble things"):
# each yielded value is sent over the ``Channel`` to the
# parent actor
yield i
await trio.sleep(0.01)
async def main():
# stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
f'donny',
rpc_module_paths=[__name__],
)
# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(__name__, 'stream_forever'):
print(letter)
# we support trio's cancellation system
assert cancel_scope.cancelled_caught
assert n.cancelled
if __name__ == '__main__':
tractor.run(main, start_method='forkserver')

View File

@ -0,0 +1,96 @@
import time
import trio
import tractor
# this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed):
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
# this is the third actor; the aggregator
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery:
portals = []
for i in range(1, 3):
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
rpc_module_paths=[__name__],
)
portals.append(portal)
send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_chan(portal, send_chan):
async with send_chan:
async for value in await portal.run(
__name__, 'stream_data', seed=seed
):
# leverage trio's built-in backpressure
await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone())
# close this local task's reference to send side
await send_chan.aclose()
unique_vals = set()
async with recv_chan:
async for value in recv_chan:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor
yield value
assert value in unique_vals
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
# this is the main actor and *arbiter*
async def main():
# a nursery which spawns "actors"
async with tractor.open_nursery() as nursery:
seed = int(1e3)
import time
pre_start = time.time()
portal = await nursery.run_in_actor(
'aggregator',
aggregate,
seed=seed,
)
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in await portal.result():
result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed))
return result_stream
if __name__ == '__main__':
final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616))

View File

@ -0,0 +1,20 @@
import tractor
tractor.log.get_console_log("INFO")
async def main(service_name):
async with tractor.open_nursery() as an:
await an.start_actor(service_name)
async with tractor.get_arbiter('127.0.0.1', 1616) as portal:
print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:
print(f"my_service is found at {sockaddr}")
await an.cancel()
if __name__ == '__main__':
tractor.run(main, 'some_actor_name')

View File

@ -11,7 +11,6 @@ import shutil
import pytest
@pytest.fixture(scope='session')
def confdir():
dirname = os.path.dirname
dirpath = os.path.abspath(
@ -20,13 +19,12 @@ def confdir():
return dirpath
@pytest.fixture
def examples_dir(confdir):
return os.path.join(confdir, 'examples')
def examples_dir():
return os.path.join(confdir(), 'examples')
@pytest.fixture
def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir):
def run_example_in_subproc(loglevel, testdir, arb_addr):
@contextmanager
def run(script_code):
@ -36,7 +34,7 @@ def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir):
# on windows we need to create a special __main__.py which will
# be executed with ``python -m __main__.py`` on windows..
shutil.copyfile(
os.path.join(examples_dir, '__main__.py'),
os.path.join(examples_dir(), '__main__.py'),
os.path.join(str(testdir), '__main__.py')
)
@ -75,8 +73,12 @@ def run_example_in_subproc(loglevel, testdir, arb_addr, examples_dir):
yield run
def test_example(examples_dir, run_example_in_subproc):
ex_file = os.path.join(examples_dir, 'a_trynamic_first_scene.py')
@pytest.mark.parametrize(
'example_script',
[f for f in os.listdir(examples_dir()) if '__' not in f],
)
def test_example(run_example_in_subproc, example_script):
ex_file = os.path.join(examples_dir(), example_script)
with open(ex_file, 'r') as ex:
code = ex.read()