diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 9aba327..8d8169e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -50,14 +50,24 @@ async def context_stream(ctx, sequence): assert cs.cancelled_caught -async def stream_from_single_subactor(stream_func): +async def stream_from_single_subactor( + arb_addr, + start_method, + stream_func, +): """Verify we can spawn a daemon actor and retrieve streamed data. """ - async with tractor.find_actor('streamerd') as portals: + # only one per host address, spawns an actor if None + + async with tractor.open_nursery( + arbiter_addr=arb_addr, + start_method=start_method, + ) as nursery: + + async with tractor.find_actor('streamerd') as portals: + + if not portals: - if not portals: - # only one per host address, spawns an actor if None - async with tractor.open_nursery() as nursery: # no brokerd actor found portal = await nursery.start_actor( 'streamerd', @@ -101,13 +111,13 @@ async def stream_from_single_subactor(stream_func): def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ - tractor.run( + trio.run( partial( stream_from_single_subactor, + arb_addr, + start_method, stream_func=stream_func, ), - arbiter_addr=arb_addr, - start_method=start_method, ) @@ -208,9 +218,10 @@ async def a_quadruple_example(): return result_stream -async def cancel_after(wait): - with trio.move_on_after(wait): - return await a_quadruple_example() +async def cancel_after(wait, arb_addr): + async with tractor.open_root_actor(arbiter_addr=arb_addr): + with trio.move_on_after(wait): + return await a_quadruple_example() @pytest.fixture(scope='module') @@ -222,7 +233,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend): timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 start = time.time() - results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr) + results = trio.run(cancel_after, timeout, arb_addr) diff = time.time() - start assert results return results, diff @@ -249,7 +260,7 @@ def test_not_fast_enough_quad( """ results, diff = time_quad_ex delay = max(diff - cancel_delay, 0) - results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) + results = trio.run(cancel_after, delay, arb_addr) system = platform.system() if system in ('Windows', 'Darwin') and results is not None: # In CI envoirments it seems later runs are quicker then the first