tractor/tests/test_tractor.py

402 lines
11 KiB
Python
Raw Permalink Normal View History

"""
Actor model API testing
"""
2018-06-12 19:23:58 +00:00
import time
2018-07-11 23:24:37 +00:00
from itertools import repeat
2018-06-12 19:23:58 +00:00
import pytest
2018-06-12 19:23:58 +00:00
import trio
import tractor
from conftest import tractor_test
2018-06-12 19:23:58 +00:00
@pytest.mark.trio
async def test_no_arbitter():
"""An arbitter must be established before any nurseries
can be created.
(In other words ``tractor.run`` must be used instead of ``trio.run`` as is
done by the ``pytest-trio`` plugin.)
"""
with pytest.raises(RuntimeError):
with tractor.open_nursery():
2018-06-12 19:23:58 +00:00
pass
def test_local_actor_async_func(arb_addr):
2018-06-12 19:23:58 +00:00
"""Verify a simple async function in-process.
"""
nums = []
async def print_loop():
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
2018-06-12 19:23:58 +00:00
for i in range(10):
nums.append(i)
2018-06-12 19:23:58 +00:00
await trio.sleep(0.1)
start = time.time()
tractor.run(print_loop, arbiter_addr=arb_addr)
2018-06-12 19:23:58 +00:00
# ensure the sleeps were actually awaited
assert time.time() - start >= 1
assert nums == list(range(10))
2018-07-11 23:24:37 +00:00
statespace = {'doggy': 10, 'kitty': 4}
# NOTE: this func must be defined at module level in order for the
# interal pickling infra of the forkserver to work
async def spawn(is_arbiter):
namespaces = [__name__]
await trio.sleep(0.1)
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
assert actor.statespace == statespace
if actor.is_arbiter:
async with tractor.open_nursery() as nursery:
# forks here
2018-08-02 19:27:09 +00:00
portal = await nursery.run_in_actor(
'sub-actor',
2018-08-02 19:27:09 +00:00
spawn,
is_arbiter=False,
statespace=statespace,
rpc_module_paths=namespaces,
)
2018-08-02 19:27:09 +00:00
assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
# be sure we can still get the result
result = await portal.result()
assert result == 10
return result
else:
return 10
def test_local_arbiter_subactor_global_state(arb_addr):
result = tractor.run(
spawn,
True,
name='arbiter',
statespace=statespace,
arbiter_addr=arb_addr,
)
assert result == 10
2018-06-12 19:23:58 +00:00
async def stream_seq(sequence):
for i in sequence:
yield i
await trio.sleep(0.1)
2018-06-12 19:23:58 +00:00
async def stream_from_single_subactor():
"""Verify we can spawn a daemon actor and retrieve streamed data.
"""
2018-06-12 19:23:58 +00:00
async with tractor.find_actor('brokerd') as portals:
if not portals:
2018-06-12 19:23:58 +00:00
# only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery:
# no brokerd actor found
portal = await nursery.start_actor(
'streamerd',
rpc_module_paths=[__name__],
statespace={'global_dict': {}},
)
seq = range(10)
agen = await portal.run(
__name__,
'stream_seq', # the func above
sequence=list(seq), # has to be msgpack serializable
)
# it'd sure be nice to have an asyncitertools here...
iseq = iter(seq)
async for val in agen:
assert val == next(iseq)
# TODO: test breaking the loop (should it kill the
# far end?)
# break
# terminate far-end async-gen
# await gen.asend(None)
# break
# stop all spawned subactors
await portal.cancel_actor()
# await nursery.cancel()
def test_stream_from_single_subactor(arb_addr):
"""Verify streaming from a spawned async generator.
"""
tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr)
async def assert_err():
assert 0
def test_remote_error(arb_addr):
"""Verify an error raises in a subactor is propagated to the parent.
"""
async def main():
async with tractor.open_nursery() as nursery:
2018-08-02 19:27:09 +00:00
portal = await nursery.run_in_actor('errorer', assert_err)
# get result(s) from main task
try:
return await portal.result()
except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehh")
raise
except Exception:
pass
assert 0, "Remote error was not raised?"
with pytest.raises(tractor.RemoteActorError):
# also raises
tractor.run(main, arbiter_addr=arb_addr)
2018-07-11 23:24:37 +00:00
async def stream_forever():
for i in repeat("I can see these little future bubble things"):
# each yielded value is sent over the ``Channel`` to the
# parent actor
2018-07-11 23:24:37 +00:00
yield i
await trio.sleep(0.01)
@tractor_test
async def test_cancel_infinite_streamer():
# stream for at most 1 seconds
2018-07-11 23:24:37 +00:00
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
f'donny',
rpc_module_paths=[__name__],
)
# this async for loop streams values from the above
# async generator running in a separate process
2018-07-11 23:24:37 +00:00
async for letter in await portal.run(__name__, 'stream_forever'):
print(letter)
# we support trio's cancellation system
2018-07-11 23:24:37 +00:00
assert cancel_scope.cancelled_caught
assert n.cancelled
2018-07-11 20:56:22 +00:00
@tractor_test
async def test_one_cancels_all():
"""Verify one failed actor causes all others in the nursery
to be cancelled just like in trio.
This is the first and only supervisory strategy at the moment.
"""
try:
async with tractor.open_nursery() as n:
real_actors = []
for i in range(3):
real_actors.append(await n.start_actor(
f'actor_{i}',
rpc_module_paths=[__name__],
))
# start one actor that will fail immediately
2018-08-02 19:27:09 +00:00
await n.run_in_actor('extra', assert_err)
2018-07-11 20:56:22 +00:00
# should error here with a ``RemoteActorError`` containing
# an ``AssertionError`
except tractor.RemoteActorError:
assert n.cancelled is True
2018-08-02 19:27:09 +00:00
assert not n._children
2018-07-11 20:56:22 +00:00
else:
pytest.fail("Should have gotten a remote assertion error?")
def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent
(errr, I mean different) process.
"""
return 'have you ever seen a portal?'
@tractor_test
async def test_movie_theatre_convo():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'frank',
# enable the actor to run funcs from this current module
rpc_module_paths=[__name__],
)
print(await portal.run(__name__, 'movie_theatre_question'))
# call the subactor a 2nd time
print(await portal.run(__name__, 'movie_theatre_question'))
# the async with will block here indefinitely waiting
2018-08-02 19:27:09 +00:00
# for our actor "frank" to complete, we cancel 'frank'
# to avoid blocking indefinitely
await portal.cancel_actor()
def cellar_door():
return "Dang that's beautiful"
@tractor_test
async def test_most_beautiful_word():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
2018-08-02 19:27:09 +00:00
portal = await n.run_in_actor('some_linguist', cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
def do_nothing():
pass
def test_cancel_single_subactor(arb_addr):
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'nothin', rpc_module_paths=[__name__],
)
assert (await portal.run(__name__, 'do_nothing')) is None
# would hang otherwise
await nursery.cancel()
tractor.run(main, arbiter_addr=arb_addr)
2018-07-12 20:20:38 +00:00
# this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed):
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
2018-07-12 20:20:38 +00:00
# this is the third actor; the aggregator
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery:
portals = []
for i in range(1, 3):
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
rpc_module_paths=[__name__],
)
portals.append(portal)
q = trio.Queue(500)
async def push_to_q(portal):
async for value in await portal.run(
__name__, 'stream_data', seed=seed
):
2018-07-11 23:24:08 +00:00
# leverage trio's built-in backpressure
await q.put(value)
await q.put(None)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(push_to_q, portal)
unique_vals = set()
async for value in q:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor
yield value
if value is None:
break
assert value in unique_vals
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
2018-07-12 20:20:38 +00:00
# this is the main actor and *arbiter*
async def a_quadruple_example():
# a nursery which spawns "actors"
async with tractor.open_nursery() as nursery:
seed = int(1e3)
pre_start = time.time()
2018-08-02 19:27:09 +00:00
portal = await nursery.run_in_actor(
'aggregator',
aggregate,
seed=seed,
)
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in await portal.result():
result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed)) + [None]
return result_stream
async def cancel_after(wait):
with trio.move_on_after(wait):
return await a_quadruple_example()
def test_a_quadruple_example(arb_addr):
2018-08-02 19:27:09 +00:00
"""This also serves as a kind of "we'd like to eventually be this
fast test".
"""
2018-08-17 19:40:59 +00:00
results = tractor.run(cancel_after, 2.2, arbiter_addr=arb_addr)
assert results
2018-08-02 20:33:42 +00:00
@pytest.mark.parametrize('cancel_delay', list(range(1, 7)))
def test_not_fast_enough_quad(arb_addr, cancel_delay):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
2018-08-02 19:27:09 +00:00
delay = 1 + cancel_delay/10
results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)
assert results is None