import time from itertools import cycle import pytest import trio import tractor from tractor.testing import tractor_test def test_type_checks(): with pytest.raises(TypeError) as err: @tractor.msg.pub async def no_get_topics(yo): yield assert "must define a `get_topics`" in str(err.value) with pytest.raises(TypeError) as err: @tractor.msg.pub def not_async_gen(yo): pass assert "must be an async generator function" in str(err.value) def is_even(i): return i % 2 == 0 @tractor.msg.pub async def pubber(get_topics, seed=10): ss = tractor.current_actor().statespace for i in cycle(range(seed)): # ensure topic subscriptions are as expected ss['get_topics'] = get_topics yield {'even' if is_even(i) else 'odd': i} await trio.sleep(0.1) async def subs( which, pub_actor_name, seed=10, portal=None, task_status=trio.TASK_STATUS_IGNORED, ): if len(which) == 1: if which[0] == 'even': pred = is_even else: def pred(i): return not is_even(i) else: def pred(i): return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: stream = await portal.run( __name__, 'pubber', topics=which, seed=seed, ) task_status.started(stream) times = 10 count = 0 await stream.__anext__() async for pkt in stream: for topic, value in pkt.items(): assert pred(value) count += 1 if count >= times: break await stream.aclose() stream = await portal.run( __name__, 'pubber', topics=['odd'], seed=seed, ) await stream.__anext__() count = 0 # async with aclosing(stream) as stream: try: async for pkt in stream: for topic, value in pkt.items(): pass # assert pred(value) count += 1 if count >= times: break finally: await stream.aclose() @tractor.msg.pub(tasks=['one', 'two']) async def multilock_pubber(get_topics): yield {'doggy': 10} @pytest.mark.parametrize( 'callwith_expecterror', [ (pubber, {}, TypeError), # missing a `topics` (multilock_pubber, {'ctx': None}, TypeError), # missing a `task_name` (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError), # should work (multilock_pubber, {'ctx': None, 'topics': ['doggy'], 'task_name': 'one'}, None), ], ) @tractor_test async def test_required_args(callwith_expecterror): func, kwargs, err = callwith_expecterror if err is not None: with pytest.raises(err): await func(**kwargs) else: async with tractor.open_nursery() as n: # await func(**kwargs) portal = await n.run_in_actor( 'pubber', multilock_pubber, **kwargs) async with tractor.wait_for_actor('pubber'): pass await trio.sleep(0.5) async for val in await portal.result(): assert val == {'doggy': 10} @pytest.mark.parametrize( 'pub_actor', ['streamer', 'arbiter'] ) def test_multi_actor_subs_arbiter_pub( loglevel, arb_addr, pub_actor, ): """Try out the neato @pub decorator system. """ async def main(): ss = tractor.current_actor().statespace async with tractor.open_nursery() as n: name = 'arbiter' if pub_actor == 'streamer': # start the publisher as a daemon master_portal = await n.start_actor( 'streamer', rpc_module_paths=[__name__], ) even_portal = await n.run_in_actor( 'evens', subs, which=['even'], pub_actor_name=name) odd_portal = await n.run_in_actor( 'odds', subs, which=['odd'], pub_actor_name=name) async with tractor.wait_for_actor('evens'): # block until 2nd actor is initialized pass if pub_actor == 'arbiter': # wait for publisher task to be spawned in a local RPC task while not ss.get('get_topics'): await trio.sleep(0.1) get_topics = ss.get('get_topics') assert 'even' in get_topics() async with tractor.wait_for_actor('odds'): # block until 2nd actor is initialized pass if pub_actor == 'arbiter': start = time.time() while 'odd' not in get_topics(): await trio.sleep(0.1) if time.time() - start > 1: pytest.fail("odds subscription never arrived?") # TODO: how to make this work when the arbiter gets # a portal to itself? Currently this causes a hang # when the channel server is torn down due to a lingering # loopback channel # with trio.move_on_after(1): # await subs(['even', 'odd']) # XXX: this would cause infinite # blocking due to actor never terminating loop # await even_portal.result() await trio.sleep(0.5) await even_portal.cancel_actor() await trio.sleep(0.5) if pub_actor == 'arbiter': assert 'even' not in get_topics() await odd_portal.cancel_actor() await trio.sleep(1) if pub_actor == 'arbiter': while get_topics(): await trio.sleep(0.1) if time.time() - start > 1: pytest.fail("odds subscription never dropped?") else: await master_portal.cancel_actor() tractor.run( main, arbiter_addr=arb_addr, rpc_module_paths=[__name__], ) def test_single_subactor_pub_multitask_subs( loglevel, arb_addr, ): async def main(): async with tractor.open_nursery() as n: portal = await n.start_actor( 'streamer', rpc_module_paths=[__name__], ) async with tractor.wait_for_actor('streamer'): # block until 2nd actor is initialized pass async with trio.open_nursery() as tn: agen = await tn.start(subs, ['even'], 'streamer') await trio.sleep(0.1) tn.start_soon(subs, ['even'], 'streamer') # XXX this will trigger the python bug: # https://bugs.python.org/issue32526 # if using async generators to wrap tractor channels await agen.aclose() await trio.sleep(0.1) tn.start_soon(subs, ['even'], 'streamer') await trio.sleep(0.1) tn.start_soon(subs, ['even'], 'streamer') await portal.cancel_actor() tractor.run( main, arbiter_addr=arb_addr, rpc_module_paths=[__name__], )