forked from goodboy/tractor
280 lines
7.4 KiB
Python
280 lines
7.4 KiB
Python
import time
|
|
from itertools import cycle
|
|
|
|
import pytest
|
|
import trio
|
|
import tractor
|
|
from tractor.testing import tractor_test
|
|
|
|
|
|
def test_type_checks():
|
|
|
|
with pytest.raises(TypeError) as err:
|
|
@tractor.msg.pub
|
|
async def no_get_topics(yo):
|
|
yield
|
|
|
|
assert "must define a `get_topics`" in str(err.value)
|
|
|
|
with pytest.raises(TypeError) as err:
|
|
@tractor.msg.pub
|
|
def not_async_gen(yo):
|
|
pass
|
|
|
|
assert "must be an async generator function" in str(err.value)
|
|
|
|
|
|
def is_even(i):
|
|
return i % 2 == 0
|
|
|
|
|
|
@tractor.msg.pub
|
|
async def pubber(get_topics, seed=10):
|
|
ss = tractor.current_actor().statespace
|
|
|
|
for i in cycle(range(seed)):
|
|
|
|
# ensure topic subscriptions are as expected
|
|
ss['get_topics'] = get_topics
|
|
|
|
yield {'even' if is_even(i) else 'odd': i}
|
|
await trio.sleep(0.1)
|
|
|
|
|
|
async def subs(
|
|
which, pub_actor_name, seed=10,
|
|
portal=None,
|
|
task_status=trio.TASK_STATUS_IGNORED,
|
|
):
|
|
if len(which) == 1:
|
|
if which[0] == 'even':
|
|
pred = is_even
|
|
else:
|
|
def pred(i):
|
|
return not is_even(i)
|
|
else:
|
|
def pred(i):
|
|
return isinstance(i, int)
|
|
|
|
async with tractor.find_actor(pub_actor_name) as portal:
|
|
stream = await portal.run(
|
|
pubber,
|
|
topics=which,
|
|
seed=seed,
|
|
)
|
|
task_status.started(stream)
|
|
times = 10
|
|
count = 0
|
|
await stream.__anext__()
|
|
async for pkt in stream:
|
|
for topic, value in pkt.items():
|
|
assert pred(value)
|
|
count += 1
|
|
if count >= times:
|
|
break
|
|
|
|
await stream.aclose()
|
|
|
|
stream = await portal.run(
|
|
pubber,
|
|
topics=['odd'],
|
|
seed=seed,
|
|
)
|
|
|
|
await stream.__anext__()
|
|
count = 0
|
|
# async with aclosing(stream) as stream:
|
|
try:
|
|
async for pkt in stream:
|
|
for topic, value in pkt.items():
|
|
pass
|
|
# assert pred(value)
|
|
count += 1
|
|
if count >= times:
|
|
break
|
|
finally:
|
|
await stream.aclose()
|
|
|
|
|
|
@tractor.msg.pub(tasks=['one', 'two'])
|
|
async def multilock_pubber(get_topics):
|
|
yield {'doggy': 10}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'callwith_expecterror',
|
|
[
|
|
(pubber, {}, TypeError),
|
|
# missing a `topics`
|
|
(multilock_pubber, {'ctx': None}, TypeError),
|
|
# missing a `task_name`
|
|
(multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError),
|
|
# should work
|
|
(multilock_pubber,
|
|
{'ctx': None, 'topics': ['doggy'], 'task_name': 'one'},
|
|
None),
|
|
],
|
|
)
|
|
@tractor_test
|
|
async def test_required_args(callwith_expecterror):
|
|
func, kwargs, err = callwith_expecterror
|
|
|
|
if err is not None:
|
|
with pytest.raises(err):
|
|
await func(**kwargs)
|
|
else:
|
|
async with tractor.open_nursery() as n:
|
|
# await func(**kwargs)
|
|
portal = await n.run_in_actor(
|
|
multilock_pubber,
|
|
name='pubber',
|
|
**kwargs
|
|
)
|
|
|
|
async with tractor.wait_for_actor('pubber'):
|
|
pass
|
|
|
|
await trio.sleep(0.5)
|
|
|
|
async for val in await portal.result():
|
|
assert val == {'doggy': 10}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'pub_actor',
|
|
['streamer', 'arbiter']
|
|
)
|
|
def test_multi_actor_subs_arbiter_pub(
|
|
loglevel,
|
|
arb_addr,
|
|
pub_actor,
|
|
):
|
|
"""Try out the neato @pub decorator system.
|
|
"""
|
|
async def main():
|
|
ss = tractor.current_actor().statespace
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
name = 'arbiter'
|
|
|
|
if pub_actor == 'streamer':
|
|
# start the publisher as a daemon
|
|
master_portal = await n.start_actor(
|
|
'streamer',
|
|
rpc_module_paths=[__name__],
|
|
)
|
|
|
|
even_portal = await n.run_in_actor(
|
|
subs,
|
|
which=['even'],
|
|
name='evens',
|
|
pub_actor_name=name
|
|
)
|
|
odd_portal = await n.run_in_actor(
|
|
subs,
|
|
which=['odd'],
|
|
name='odds',
|
|
pub_actor_name=name
|
|
)
|
|
|
|
async with tractor.wait_for_actor('evens'):
|
|
# block until 2nd actor is initialized
|
|
pass
|
|
|
|
if pub_actor == 'arbiter':
|
|
# wait for publisher task to be spawned in a local RPC task
|
|
while not ss.get('get_topics'):
|
|
await trio.sleep(0.1)
|
|
|
|
get_topics = ss.get('get_topics')
|
|
|
|
assert 'even' in get_topics()
|
|
|
|
async with tractor.wait_for_actor('odds'):
|
|
# block until 2nd actor is initialized
|
|
pass
|
|
|
|
if pub_actor == 'arbiter':
|
|
start = time.time()
|
|
while 'odd' not in get_topics():
|
|
await trio.sleep(0.1)
|
|
if time.time() - start > 1:
|
|
pytest.fail("odds subscription never arrived?")
|
|
|
|
# TODO: how to make this work when the arbiter gets
|
|
# a portal to itself? Currently this causes a hang
|
|
# when the channel server is torn down due to a lingering
|
|
# loopback channel
|
|
# with trio.move_on_after(1):
|
|
# await subs(['even', 'odd'])
|
|
|
|
# XXX: this would cause infinite
|
|
# blocking due to actor never terminating loop
|
|
# await even_portal.result()
|
|
|
|
await trio.sleep(0.5)
|
|
await even_portal.cancel_actor()
|
|
await trio.sleep(1)
|
|
|
|
if pub_actor == 'arbiter':
|
|
assert 'even' not in get_topics()
|
|
|
|
await odd_portal.cancel_actor()
|
|
await trio.sleep(2)
|
|
|
|
if pub_actor == 'arbiter':
|
|
while get_topics():
|
|
await trio.sleep(0.1)
|
|
if time.time() - start > 2:
|
|
pytest.fail("odds subscription never dropped?")
|
|
else:
|
|
await master_portal.cancel_actor()
|
|
|
|
tractor.run(
|
|
main,
|
|
arbiter_addr=arb_addr,
|
|
rpc_module_paths=[__name__],
|
|
)
|
|
|
|
|
|
def test_single_subactor_pub_multitask_subs(
|
|
loglevel,
|
|
arb_addr,
|
|
):
|
|
async def main():
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
portal = await n.start_actor(
|
|
'streamer',
|
|
rpc_module_paths=[__name__],
|
|
)
|
|
async with tractor.wait_for_actor('streamer'):
|
|
# block until 2nd actor is initialized
|
|
pass
|
|
|
|
async with trio.open_nursery() as tn:
|
|
agen = await tn.start(subs, ['even'], 'streamer')
|
|
|
|
await trio.sleep(0.1)
|
|
tn.start_soon(subs, ['even'], 'streamer')
|
|
|
|
# XXX this will trigger the python bug:
|
|
# https://bugs.python.org/issue32526
|
|
# if using async generators to wrap tractor channels
|
|
await agen.aclose()
|
|
|
|
await trio.sleep(0.1)
|
|
tn.start_soon(subs, ['even'], 'streamer')
|
|
await trio.sleep(0.1)
|
|
tn.start_soon(subs, ['even'], 'streamer')
|
|
|
|
await portal.cancel_actor()
|
|
|
|
tractor.run(
|
|
main,
|
|
arbiter_addr=arb_addr,
|
|
rpc_module_paths=[__name__],
|
|
)
|