diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 745731f..745e5ec 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -57,35 +57,35 @@ async def subs( return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - agen = await portal.run( + stream = await portal.run( __name__, 'pubber', topics=which, seed=seed, ) - task_status.started(agen) + task_status.started(stream) times = 10 count = 0 - await agen.__anext__() - async for pkt in agen: + await stream.__anext__() + async for pkt in stream: for topic, value in pkt.items(): assert pred(value) count += 1 if count >= times: break - await agen.aclose() + await stream.aclose() - agen = await portal.run( + stream = await portal.run( __name__, 'pubber', topics=['odd'], seed=seed, ) - await agen.__anext__() + await stream.__anext__() count = 0 - # async with aclosing(agen) as agen: + # async with aclosing(stream) as stream: try: - async for pkt in agen: + async for pkt in stream: for topic, value in pkt.items(): pass # assert pred(value) @@ -93,7 +93,7 @@ async def subs( if count >= times: break finally: - await agen.aclose() + await stream.aclose() @tractor.msg.pub(tasks=['one', 'two']) @@ -111,7 +111,7 @@ async def multilock_pubber(get_topics): (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError), # should work (multilock_pubber, - {'ctx': None, 'topics': ['topic1'], 'task_name': 'one'}, + {'ctx': None, 'topics': ['doggy'], 'task_name': 'one'}, None), ], ) @@ -126,7 +126,12 @@ async def test_required_args(callwith_expecterror): async with tractor.open_nursery() as n: # await func(**kwargs) portal = await n.run_in_actor( - 'sub', multilock_pubber, **kwargs) + 'pubber', multilock_pubber, **kwargs) + + async with tractor.wait_for_actor('pubber'): + pass + + await trio.sleep(0.5) async for val in await portal.result(): assert val == {'doggy': 10} @@ -246,7 +251,8 @@ def test_single_subactor_pub_multitask_subs( # XXX this will trigger the python bug: # https://bugs.python.org/issue32526 - # await agen.aclose() + # if using async generators to wrap tractor channels + await agen.aclose() await trio.sleep(0.1) tn.start_soon(subs, ['even'], 'streamer')