From 2b1e8773bb4896b969e6e213dbf8fac54707ccee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Jan 2019 22:35:04 -0500 Subject: [PATCH] Verify subs topics at each step --- tests/test_local.py | 1 - tests/test_pubsub.py | 82 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/tests/test_local.py b/tests/test_local.py index 2f9e5ee..eb0c676 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -7,7 +7,6 @@ import pytest import trio import tractor - from conftest import tractor_test diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index fef3c14..e5f37ef 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -1,3 +1,4 @@ +import time from functools import partial from itertools import cycle @@ -14,13 +15,18 @@ def is_even(i): @tractor.msg.pub async def pubber(get_topics): + ss = tractor.current_actor().statespace + for i in cycle(range(10)): - topics = get_topics() + + # ensure topic subscriptions are as expected + ss['get_topics'] = get_topics + yield {'even' if is_even(i) else 'odd': i} await trio.sleep(0.1) -async def subs(which): +async def subs(which, pub_actor_name): if len(which) == 1: if which[0] == 'even': pred = is_even @@ -29,7 +35,7 @@ async def subs(which): else: pred = lambda i: isinstance(i, int) - async with tractor.find_actor('streamer') as portal: + async with tractor.find_actor(pub_actor_name) as portal: agen = await portal.run(__name__, 'pubber', topics=which) async with aclosing(agen) as agen: async for pkt in agen: @@ -37,25 +43,60 @@ async def subs(which): assert pred(value) +@pytest.mark.parametrize( + 'pub_actor', + ['streamer', 'arbiter'] +) def test_pubsub_multi_actor_subs( loglevel, arb_addr, + pub_actor, ): + """Try out the neato @pub decorator system. + """ async def main(): - async with tractor.open_nursery() as n: - # start the publisher as a daemon - master_portal = await n.start_actor( - 'streamer', - rpc_module_paths=[__name__], - ) + ss = tractor.current_actor().statespace - even_portal = await n.run_in_actor('evens', subs, which=['even']) - odd_portal = await n.run_in_actor('odds', subs, which=['odd']) + async with tractor.open_nursery() as n: + + name = 'arbiter' + + if pub_actor is 'streamer': + # start the publisher as a daemon + master_portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + + even_portal = await n.run_in_actor( + 'evens', subs, which=['even'], pub_actor_name=name) + odd_portal = await n.run_in_actor( + 'odds', subs, which=['odd'], pub_actor_name=name) + + async with tractor.wait_for_actor('evens'): + # block until 2nd actor is initialized + pass + + if pub_actor is 'arbiter': + # wait for publisher task to be spawned in a local RPC task + while not ss.get('get_topics'): + await trio.sleep(0.1) + + get_topics = ss.get('get_topics') + + assert 'even' in get_topics() async with tractor.wait_for_actor('odds'): # block until 2nd actor is initialized pass + if pub_actor is 'arbiter': + start = time.time() + while 'odd' not in get_topics(): + await trio.sleep(0.1) + if time.time() - start > 1: + pytest.fail("odds subscription never arrived?") + # TODO: how to make this work when the arbiter gets # a portal to itself? Currently this causes a hang # when the channel server is torn down due to a lingering @@ -67,12 +108,23 @@ def test_pubsub_multi_actor_subs( # blocking due to actor never terminating loop # await even_portal.result() - await trio.sleep(1) + await trio.sleep(0.5) await even_portal.cancel_actor() - await trio.sleep(1) - await odd_portal.cancel_actor() + await trio.sleep(0.5) - await master_portal.cancel_actor() + if pub_actor is 'arbiter': + assert 'even' not in get_topics() + + await odd_portal.cancel_actor() + await trio.sleep(1) + + if pub_actor is 'arbiter': + while get_topics(): + await trio.sleep(0.1) + if time.time() - start > 1: + pytest.fail("odds subscription never dropped?") + else: + await master_portal.cancel_actor() tractor.run( main,