diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index f0c4beb..745731f 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -4,7 +4,6 @@ from itertools import cycle import pytest import trio import tractor -from async_generator import aclosing from tractor.testing import tractor_test @@ -30,10 +29,10 @@ def is_even(i): @tractor.msg.pub -async def pubber(get_topics): +async def pubber(get_topics, seed=10): ss = tractor.current_actor().statespace - for i in cycle(range(10)): + for i in cycle(range(seed)): # ensure topic subscriptions are as expected ss['get_topics'] = get_topics @@ -42,7 +41,11 @@ async def pubber(get_topics): await trio.sleep(0.1) -async def subs(which, pub_actor_name): +async def subs( + which, pub_actor_name, seed=10, + portal=None, + task_status=trio.TASK_STATUS_IGNORED, +): if len(which) == 1: if which[0] == 'even': pred = is_even @@ -54,11 +57,43 @@ async def subs(which, pub_actor_name): return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - agen = await portal.run(__name__, 'pubber', topics=which) - async with aclosing(agen) as agen: + agen = await portal.run( + __name__, 'pubber', + topics=which, + seed=seed, + ) + task_status.started(agen) + times = 10 + count = 0 + await agen.__anext__() + async for pkt in agen: + for topic, value in pkt.items(): + assert pred(value) + count += 1 + if count >= times: + break + + await agen.aclose() + + agen = await portal.run( + __name__, 'pubber', + topics=['odd'], + seed=seed, + ) + + await agen.__anext__() + count = 0 + # async with aclosing(agen) as agen: + try: async for pkt in agen: for topic, value in pkt.items(): - assert pred(value) + pass + # assert pred(value) + count += 1 + if count >= times: + break + finally: + await agen.aclose() @tractor.msg.pub(tasks=['one', 'two']) @@ -101,7 +136,7 @@ async def test_required_args(callwith_expecterror): 'pub_actor', ['streamer', 'arbiter'] ) -def test_pubsub_multi_actor_subs( +def test_multi_actor_subs_arbiter_pub( loglevel, arb_addr, pub_actor, @@ -115,7 +150,7 @@ def test_pubsub_multi_actor_subs( name = 'arbiter' - if pub_actor is 'streamer': + if pub_actor == 'streamer': # start the publisher as a daemon master_portal = await n.start_actor( 'streamer', @@ -131,7 +166,7 @@ def test_pubsub_multi_actor_subs( # block until 2nd actor is initialized pass - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': # wait for publisher task to be spawned in a local RPC task while not ss.get('get_topics'): await trio.sleep(0.1) @@ -144,7 +179,7 @@ def test_pubsub_multi_actor_subs( # block until 2nd actor is initialized pass - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': start = time.time() while 'odd' not in get_topics(): await trio.sleep(0.1) @@ -166,13 +201,13 @@ def test_pubsub_multi_actor_subs( await even_portal.cancel_actor() await trio.sleep(0.5) - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': assert 'even' not in get_topics() await odd_portal.cancel_actor() await trio.sleep(1) - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': while get_topics(): await trio.sleep(0.1) if time.time() - start > 1: @@ -185,3 +220,43 @@ def test_pubsub_multi_actor_subs( arbiter_addr=arb_addr, rpc_module_paths=[__name__], ) + + +def test_single_subactor_pub_multitask_subs( + loglevel, + arb_addr, +): + async def main(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + async with tractor.wait_for_actor('streamer'): + # block until 2nd actor is initialized + pass + + async with trio.open_nursery() as tn: + agen = await tn.start(subs, ['even'], 'streamer') + + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + + # XXX this will trigger the python bug: + # https://bugs.python.org/issue32526 + # await agen.aclose() + + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + + await portal.cancel_actor() + + tractor.run( + main, + arbiter_addr=arb_addr, + rpc_module_paths=[__name__], + )