Verify subs topics at each step
parent
7675b01722
commit
2b1e8773bb
|
@ -7,7 +7,6 @@ import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
from conftest import tractor_test
|
from conftest import tractor_test
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import time
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import cycle
|
from itertools import cycle
|
||||||
|
|
||||||
|
@ -14,13 +15,18 @@ def is_even(i):
|
||||||
|
|
||||||
@tractor.msg.pub
|
@tractor.msg.pub
|
||||||
async def pubber(get_topics):
|
async def pubber(get_topics):
|
||||||
|
ss = tractor.current_actor().statespace
|
||||||
|
|
||||||
for i in cycle(range(10)):
|
for i in cycle(range(10)):
|
||||||
topics = get_topics()
|
|
||||||
|
# ensure topic subscriptions are as expected
|
||||||
|
ss['get_topics'] = get_topics
|
||||||
|
|
||||||
yield {'even' if is_even(i) else 'odd': i}
|
yield {'even' if is_even(i) else 'odd': i}
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
async def subs(which):
|
async def subs(which, pub_actor_name):
|
||||||
if len(which) == 1:
|
if len(which) == 1:
|
||||||
if which[0] == 'even':
|
if which[0] == 'even':
|
||||||
pred = is_even
|
pred = is_even
|
||||||
|
@ -29,7 +35,7 @@ async def subs(which):
|
||||||
else:
|
else:
|
||||||
pred = lambda i: isinstance(i, int)
|
pred = lambda i: isinstance(i, int)
|
||||||
|
|
||||||
async with tractor.find_actor('streamer') as portal:
|
async with tractor.find_actor(pub_actor_name) as portal:
|
||||||
agen = await portal.run(__name__, 'pubber', topics=which)
|
agen = await portal.run(__name__, 'pubber', topics=which)
|
||||||
async with aclosing(agen) as agen:
|
async with aclosing(agen) as agen:
|
||||||
async for pkt in agen:
|
async for pkt in agen:
|
||||||
|
@ -37,25 +43,60 @@ async def subs(which):
|
||||||
assert pred(value)
|
assert pred(value)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'pub_actor',
|
||||||
|
['streamer', 'arbiter']
|
||||||
|
)
|
||||||
def test_pubsub_multi_actor_subs(
|
def test_pubsub_multi_actor_subs(
|
||||||
loglevel,
|
loglevel,
|
||||||
arb_addr,
|
arb_addr,
|
||||||
|
pub_actor,
|
||||||
):
|
):
|
||||||
|
"""Try out the neato @pub decorator system.
|
||||||
|
"""
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
ss = tractor.current_actor().statespace
|
||||||
# start the publisher as a daemon
|
|
||||||
master_portal = await n.start_actor(
|
|
||||||
'streamer',
|
|
||||||
rpc_module_paths=[__name__],
|
|
||||||
)
|
|
||||||
|
|
||||||
even_portal = await n.run_in_actor('evens', subs, which=['even'])
|
async with tractor.open_nursery() as n:
|
||||||
odd_portal = await n.run_in_actor('odds', subs, which=['odd'])
|
|
||||||
|
name = 'arbiter'
|
||||||
|
|
||||||
|
if pub_actor is 'streamer':
|
||||||
|
# start the publisher as a daemon
|
||||||
|
master_portal = await n.start_actor(
|
||||||
|
'streamer',
|
||||||
|
rpc_module_paths=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
even_portal = await n.run_in_actor(
|
||||||
|
'evens', subs, which=['even'], pub_actor_name=name)
|
||||||
|
odd_portal = await n.run_in_actor(
|
||||||
|
'odds', subs, which=['odd'], pub_actor_name=name)
|
||||||
|
|
||||||
|
async with tractor.wait_for_actor('evens'):
|
||||||
|
# block until 2nd actor is initialized
|
||||||
|
pass
|
||||||
|
|
||||||
|
if pub_actor is 'arbiter':
|
||||||
|
# wait for publisher task to be spawned in a local RPC task
|
||||||
|
while not ss.get('get_topics'):
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
get_topics = ss.get('get_topics')
|
||||||
|
|
||||||
|
assert 'even' in get_topics()
|
||||||
|
|
||||||
async with tractor.wait_for_actor('odds'):
|
async with tractor.wait_for_actor('odds'):
|
||||||
# block until 2nd actor is initialized
|
# block until 2nd actor is initialized
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if pub_actor is 'arbiter':
|
||||||
|
start = time.time()
|
||||||
|
while 'odd' not in get_topics():
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
if time.time() - start > 1:
|
||||||
|
pytest.fail("odds subscription never arrived?")
|
||||||
|
|
||||||
# TODO: how to make this work when the arbiter gets
|
# TODO: how to make this work when the arbiter gets
|
||||||
# a portal to itself? Currently this causes a hang
|
# a portal to itself? Currently this causes a hang
|
||||||
# when the channel server is torn down due to a lingering
|
# when the channel server is torn down due to a lingering
|
||||||
|
@ -67,12 +108,23 @@ def test_pubsub_multi_actor_subs(
|
||||||
# blocking due to actor never terminating loop
|
# blocking due to actor never terminating loop
|
||||||
# await even_portal.result()
|
# await even_portal.result()
|
||||||
|
|
||||||
await trio.sleep(1)
|
await trio.sleep(0.5)
|
||||||
await even_portal.cancel_actor()
|
await even_portal.cancel_actor()
|
||||||
await trio.sleep(1)
|
await trio.sleep(0.5)
|
||||||
await odd_portal.cancel_actor()
|
|
||||||
|
|
||||||
await master_portal.cancel_actor()
|
if pub_actor is 'arbiter':
|
||||||
|
assert 'even' not in get_topics()
|
||||||
|
|
||||||
|
await odd_portal.cancel_actor()
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
if pub_actor is 'arbiter':
|
||||||
|
while get_topics():
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
if time.time() - start > 1:
|
||||||
|
pytest.fail("odds subscription never dropped?")
|
||||||
|
else:
|
||||||
|
await master_portal.cancel_actor()
|
||||||
|
|
||||||
tractor.run(
|
tractor.run(
|
||||||
main,
|
main,
|
||||||
|
|
Loading…
Reference in New Issue