diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index a685f924..e7821661 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -1,9 +1,11 @@ """ -Streaming via async gen api +Streaming via the, now legacy, "async-gen API". + """ import time from functools import partial import platform +from typing import Callable import trio import tractor @@ -19,7 +21,11 @@ def test_must_define_ctx(): async def no_ctx(): pass - assert "no_ctx must be `ctx: tractor.Context" in str(err.value) + assert ( + "no_ctx must be `ctx: tractor.Context" + in + str(err.value) + ) @tractor.stream async def has_ctx(ctx): @@ -69,14 +75,14 @@ async def stream_from_single_subactor( async with tractor.open_nursery( registry_addrs=[reg_addr], start_method=start_method, - ) as nursery: + ) as an: async with tractor.find_actor('streamerd') as portals: if not portals: # no brokerd actor found - portal = await nursery.start_actor( + portal = await an.start_actor( 'streamerd', enable_modules=[__name__], ) @@ -116,11 +122,22 @@ async def stream_from_single_subactor( @pytest.mark.parametrize( - 'stream_func', [async_gen_stream, context_stream] + 'stream_func', + [ + async_gen_stream, + context_stream, + ], + ids='stream_func={}'.format ) -def test_stream_from_single_subactor(reg_addr, start_method, stream_func): - """Verify streaming from a spawned async generator. - """ +def test_stream_from_single_subactor( + reg_addr: tuple, + start_method: str, + stream_func: Callable, +): + ''' + Verify streaming from a spawned async generator. + + ''' trio.run( partial( stream_from_single_subactor, @@ -132,10 +149,9 @@ def test_stream_from_single_subactor(reg_addr, start_method, stream_func): # this is the first 2 actors, streamer_1 and streamer_2 -async def stream_data(seed): +async def stream_data(seed: int): for i in range(seed): - yield i # trigger scheduler to simulate practical usage @@ -143,15 +159,17 @@ async def stream_data(seed): # this is the third actor; the aggregator -async def aggregate(seed): - """Ensure that the two streams we receive match but only stream +async def aggregate(seed: int): + ''' + Ensure that the two streams we receive match but only stream a single set of values to the parent. - """ - async with tractor.open_nursery() as nursery: + + ''' + async with tractor.open_nursery() as an: portals = [] for i in range(1, 3): # fork point - portal = await nursery.start_actor( + portal = await an.start_actor( name=f'streamer_{i}', enable_modules=[__name__], ) @@ -164,7 +182,8 @@ async def aggregate(seed): async with send_chan: async with portal.open_stream_from( - stream_data, seed=seed, + stream_data, + seed=seed, ) as stream: async for value in stream: @@ -174,10 +193,14 @@ async def aggregate(seed): print(f"FINISHED ITERATING {portal.channel.uid}") # spawn 2 trio tasks to collect streams and push to a local queue - async with trio.open_nursery() as n: + async with trio.open_nursery() as tn: for portal in portals: - n.start_soon(push_to_chan, portal, send_chan.clone()) + tn.start_soon( + push_to_chan, + portal, + send_chan.clone(), + ) # close this local task's reference to send side await send_chan.aclose() @@ -194,20 +217,21 @@ async def aggregate(seed): print("FINISHED ITERATING in aggregator") - await nursery.cancel() + await an.cancel() print("WAITING on `ActorNursery` to finish") print("AGGREGATOR COMPLETE!") -# this is the main actor and *arbiter* -async def a_quadruple_example(): - # a nursery which spawns "actors" - async with tractor.open_nursery() as nursery: +async def a_quadruple_example() -> list[int]: + ''' + Open the root-actor which is also a "registrar". + ''' + async with tractor.open_nursery() as an: seed = int(1e3) pre_start = time.time() - portal = await nursery.start_actor( + portal = await an.start_actor( name='aggregator', enable_modules=[__name__], ) @@ -228,8 +252,14 @@ async def a_quadruple_example(): return result_stream -async def cancel_after(wait, reg_addr): - async with tractor.open_root_actor(registry_addrs=[reg_addr]): +async def cancel_after( + wait: float, + reg_addr: tuple, +) -> list[int]: + + async with tractor.open_root_actor( + registry_addrs=[reg_addr], + ): with trio.move_on_after(wait): return await a_quadruple_example() @@ -242,7 +272,7 @@ def time_quad_ex( ): non_linux: bool = (_sys := platform.system()) != 'Linux' if ci_env and non_linux: - pytest.skip("Test is too flaky on {_sys!r} in CI") + pytest.skip(f'Test is too flaky on {_sys!r} in CI') if spawn_backend == 'mp': ''' @@ -253,14 +283,18 @@ def time_quad_ex( timeout = 7 if non_linux else 4 start = time.time() - results = trio.run(cancel_after, timeout, reg_addr) - diff = time.time() - start + results: list[int] = trio.run( + cancel_after, + timeout, + reg_addr, + ) + diff: float = time.time() - start assert results return results, diff def test_a_quadruple_example( - time_quad_ex: tuple, + time_quad_ex: tuple[list[int], float], ci_env: bool, spawn_backend: str, ): @@ -284,19 +318,33 @@ def test_a_quadruple_example( list(map(lambda i: i/10, range(3, 9))) ) def test_not_fast_enough_quad( - reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend + reg_addr: tuple, + time_quad_ex: tuple[list[int], float], + cancel_delay: float, + ci_env: bool, + spawn_backend: str, ): - """Verify we can cancel midway through the quad example and all actors - cancel gracefully. - """ + ''' + Verify we can cancel midway through the quad example and all + actors cancel gracefully. + + ''' results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = trio.run(cancel_after, delay, reg_addr) - system = platform.system() - if system in ('Windows', 'Darwin') and results is not None: + results = trio.run( + cancel_after, + delay, + reg_addr, + ) + system: str = platform.system() + if ( + system in ('Windows', 'Darwin') + and + results is not None + ): # In CI envoirments it seems later runs are quicker then the first # so just ignore these - print(f"Woa there {system} caught your breath eh?") + print(f'Woa there {system} caught your breath eh?') else: # should be cancelled mid-streaming assert results is None @@ -304,23 +352,24 @@ def test_not_fast_enough_quad( @tractor_test async def test_respawn_consumer_task( - reg_addr, - spawn_backend, - loglevel, + reg_addr: tuple, + spawn_backend: str, + loglevel: str, ): - """Verify that ``._portal.ReceiveStream.shield()`` + ''' + Verify that ``._portal.ReceiveStream.shield()`` sucessfully protects the underlying IPC channel from being closed when cancelling and respawning a consumer task. This also serves to verify that all values from the stream can be received despite the respawns. - """ + ''' stream = None - async with tractor.open_nursery() as n: + async with tractor.open_nursery() as an: - portal = await n.start_actor( + portal = await an.start_actor( name='streamer', enable_modules=[__name__] )