Restyle `test_legacy_one_way_streaming` mod
- convert all doc-strings to `'''` multiline style. - rename `nursery` -> `an`, `n` -> `tn` to match project-wide conventions. - add type annotations to fn params (fixtures, test helpers). - break long lines into multiline style for fn calls, assertions, and `parametrize` decorator lists. - add `ids=` to `@pytest.mark.parametrize`. - use `'` over `"` for string literals. - add `from typing import Callable` import. - drop spurious blank lines inside generators. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codens_aware
parent
fb94aa0095
commit
d135ce94af
|
|
@ -1,9 +1,11 @@
|
||||||
"""
|
"""
|
||||||
Streaming via async gen api
|
Streaming via the, now legacy, "async-gen API".
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import time
|
import time
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import platform
|
import platform
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
@ -19,7 +21,11 @@ def test_must_define_ctx():
|
||||||
async def no_ctx():
|
async def no_ctx():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
assert "no_ctx must be `ctx: tractor.Context" in str(err.value)
|
assert (
|
||||||
|
"no_ctx must be `ctx: tractor.Context"
|
||||||
|
in
|
||||||
|
str(err.value)
|
||||||
|
)
|
||||||
|
|
||||||
@tractor.stream
|
@tractor.stream
|
||||||
async def has_ctx(ctx):
|
async def has_ctx(ctx):
|
||||||
|
|
@ -69,14 +75,14 @@ async def stream_from_single_subactor(
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr],
|
registry_addrs=[reg_addr],
|
||||||
start_method=start_method,
|
start_method=start_method,
|
||||||
) as nursery:
|
) as an:
|
||||||
|
|
||||||
async with tractor.find_actor('streamerd') as portals:
|
async with tractor.find_actor('streamerd') as portals:
|
||||||
|
|
||||||
if not portals:
|
if not portals:
|
||||||
|
|
||||||
# no brokerd actor found
|
# no brokerd actor found
|
||||||
portal = await nursery.start_actor(
|
portal = await an.start_actor(
|
||||||
'streamerd',
|
'streamerd',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
@ -116,11 +122,22 @@ async def stream_from_single_subactor(
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'stream_func', [async_gen_stream, context_stream]
|
'stream_func',
|
||||||
|
[
|
||||||
|
async_gen_stream,
|
||||||
|
context_stream,
|
||||||
|
],
|
||||||
|
ids='stream_func={}'.format
|
||||||
)
|
)
|
||||||
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
|
def test_stream_from_single_subactor(
|
||||||
"""Verify streaming from a spawned async generator.
|
reg_addr: tuple,
|
||||||
"""
|
start_method: str,
|
||||||
|
stream_func: Callable,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify streaming from a spawned async generator.
|
||||||
|
|
||||||
|
'''
|
||||||
trio.run(
|
trio.run(
|
||||||
partial(
|
partial(
|
||||||
stream_from_single_subactor,
|
stream_from_single_subactor,
|
||||||
|
|
@ -132,10 +149,9 @@ def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
|
||||||
|
|
||||||
|
|
||||||
# this is the first 2 actors, streamer_1 and streamer_2
|
# this is the first 2 actors, streamer_1 and streamer_2
|
||||||
async def stream_data(seed):
|
async def stream_data(seed: int):
|
||||||
|
|
||||||
for i in range(seed):
|
for i in range(seed):
|
||||||
|
|
||||||
yield i
|
yield i
|
||||||
|
|
||||||
# trigger scheduler to simulate practical usage
|
# trigger scheduler to simulate practical usage
|
||||||
|
|
@ -143,15 +159,17 @@ async def stream_data(seed):
|
||||||
|
|
||||||
|
|
||||||
# this is the third actor; the aggregator
|
# this is the third actor; the aggregator
|
||||||
async def aggregate(seed):
|
async def aggregate(seed: int):
|
||||||
"""Ensure that the two streams we receive match but only stream
|
'''
|
||||||
|
Ensure that the two streams we receive match but only stream
|
||||||
a single set of values to the parent.
|
a single set of values to the parent.
|
||||||
"""
|
|
||||||
async with tractor.open_nursery() as nursery:
|
'''
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
portals = []
|
portals = []
|
||||||
for i in range(1, 3):
|
for i in range(1, 3):
|
||||||
# fork point
|
# fork point
|
||||||
portal = await nursery.start_actor(
|
portal = await an.start_actor(
|
||||||
name=f'streamer_{i}',
|
name=f'streamer_{i}',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
@ -164,7 +182,8 @@ async def aggregate(seed):
|
||||||
async with send_chan:
|
async with send_chan:
|
||||||
|
|
||||||
async with portal.open_stream_from(
|
async with portal.open_stream_from(
|
||||||
stream_data, seed=seed,
|
stream_data,
|
||||||
|
seed=seed,
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
async for value in stream:
|
async for value in stream:
|
||||||
|
|
@ -174,10 +193,14 @@ async def aggregate(seed):
|
||||||
print(f"FINISHED ITERATING {portal.channel.uid}")
|
print(f"FINISHED ITERATING {portal.channel.uid}")
|
||||||
|
|
||||||
# spawn 2 trio tasks to collect streams and push to a local queue
|
# spawn 2 trio tasks to collect streams and push to a local queue
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as tn:
|
||||||
|
|
||||||
for portal in portals:
|
for portal in portals:
|
||||||
n.start_soon(push_to_chan, portal, send_chan.clone())
|
tn.start_soon(
|
||||||
|
push_to_chan,
|
||||||
|
portal,
|
||||||
|
send_chan.clone(),
|
||||||
|
)
|
||||||
|
|
||||||
# close this local task's reference to send side
|
# close this local task's reference to send side
|
||||||
await send_chan.aclose()
|
await send_chan.aclose()
|
||||||
|
|
@ -194,20 +217,21 @@ async def aggregate(seed):
|
||||||
|
|
||||||
print("FINISHED ITERATING in aggregator")
|
print("FINISHED ITERATING in aggregator")
|
||||||
|
|
||||||
await nursery.cancel()
|
await an.cancel()
|
||||||
print("WAITING on `ActorNursery` to finish")
|
print("WAITING on `ActorNursery` to finish")
|
||||||
print("AGGREGATOR COMPLETE!")
|
print("AGGREGATOR COMPLETE!")
|
||||||
|
|
||||||
|
|
||||||
# this is the main actor and *arbiter*
|
async def a_quadruple_example() -> list[int]:
|
||||||
async def a_quadruple_example():
|
'''
|
||||||
# a nursery which spawns "actors"
|
Open the root-actor which is also a "registrar".
|
||||||
async with tractor.open_nursery() as nursery:
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
pre_start = time.time()
|
pre_start = time.time()
|
||||||
|
|
||||||
portal = await nursery.start_actor(
|
portal = await an.start_actor(
|
||||||
name='aggregator',
|
name='aggregator',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
@ -228,8 +252,14 @@ async def a_quadruple_example():
|
||||||
return result_stream
|
return result_stream
|
||||||
|
|
||||||
|
|
||||||
async def cancel_after(wait, reg_addr):
|
async def cancel_after(
|
||||||
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
|
wait: float,
|
||||||
|
reg_addr: tuple,
|
||||||
|
) -> list[int]:
|
||||||
|
|
||||||
|
async with tractor.open_root_actor(
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
|
):
|
||||||
with trio.move_on_after(wait):
|
with trio.move_on_after(wait):
|
||||||
return await a_quadruple_example()
|
return await a_quadruple_example()
|
||||||
|
|
||||||
|
|
@ -242,7 +272,7 @@ def time_quad_ex(
|
||||||
):
|
):
|
||||||
non_linux: bool = (_sys := platform.system()) != 'Linux'
|
non_linux: bool = (_sys := platform.system()) != 'Linux'
|
||||||
if ci_env and non_linux:
|
if ci_env and non_linux:
|
||||||
pytest.skip("Test is too flaky on {_sys!r} in CI")
|
pytest.skip(f'Test is too flaky on {_sys!r} in CI')
|
||||||
|
|
||||||
if spawn_backend == 'mp':
|
if spawn_backend == 'mp':
|
||||||
'''
|
'''
|
||||||
|
|
@ -253,14 +283,18 @@ def time_quad_ex(
|
||||||
|
|
||||||
timeout = 7 if non_linux else 4
|
timeout = 7 if non_linux else 4
|
||||||
start = time.time()
|
start = time.time()
|
||||||
results = trio.run(cancel_after, timeout, reg_addr)
|
results: list[int] = trio.run(
|
||||||
diff = time.time() - start
|
cancel_after,
|
||||||
|
timeout,
|
||||||
|
reg_addr,
|
||||||
|
)
|
||||||
|
diff: float = time.time() - start
|
||||||
assert results
|
assert results
|
||||||
return results, diff
|
return results, diff
|
||||||
|
|
||||||
|
|
||||||
def test_a_quadruple_example(
|
def test_a_quadruple_example(
|
||||||
time_quad_ex: tuple,
|
time_quad_ex: tuple[list[int], float],
|
||||||
ci_env: bool,
|
ci_env: bool,
|
||||||
spawn_backend: str,
|
spawn_backend: str,
|
||||||
):
|
):
|
||||||
|
|
@ -284,19 +318,33 @@ def test_a_quadruple_example(
|
||||||
list(map(lambda i: i/10, range(3, 9)))
|
list(map(lambda i: i/10, range(3, 9)))
|
||||||
)
|
)
|
||||||
def test_not_fast_enough_quad(
|
def test_not_fast_enough_quad(
|
||||||
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
|
reg_addr: tuple,
|
||||||
|
time_quad_ex: tuple[list[int], float],
|
||||||
|
cancel_delay: float,
|
||||||
|
ci_env: bool,
|
||||||
|
spawn_backend: str,
|
||||||
):
|
):
|
||||||
"""Verify we can cancel midway through the quad example and all actors
|
'''
|
||||||
cancel gracefully.
|
Verify we can cancel midway through the quad example and all
|
||||||
"""
|
actors cancel gracefully.
|
||||||
|
|
||||||
|
'''
|
||||||
results, diff = time_quad_ex
|
results, diff = time_quad_ex
|
||||||
delay = max(diff - cancel_delay, 0)
|
delay = max(diff - cancel_delay, 0)
|
||||||
results = trio.run(cancel_after, delay, reg_addr)
|
results = trio.run(
|
||||||
system = platform.system()
|
cancel_after,
|
||||||
if system in ('Windows', 'Darwin') and results is not None:
|
delay,
|
||||||
|
reg_addr,
|
||||||
|
)
|
||||||
|
system: str = platform.system()
|
||||||
|
if (
|
||||||
|
system in ('Windows', 'Darwin')
|
||||||
|
and
|
||||||
|
results is not None
|
||||||
|
):
|
||||||
# In CI envoirments it seems later runs are quicker then the first
|
# In CI envoirments it seems later runs are quicker then the first
|
||||||
# so just ignore these
|
# so just ignore these
|
||||||
print(f"Woa there {system} caught your breath eh?")
|
print(f'Woa there {system} caught your breath eh?')
|
||||||
else:
|
else:
|
||||||
# should be cancelled mid-streaming
|
# should be cancelled mid-streaming
|
||||||
assert results is None
|
assert results is None
|
||||||
|
|
@ -304,23 +352,24 @@ def test_not_fast_enough_quad(
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_respawn_consumer_task(
|
async def test_respawn_consumer_task(
|
||||||
reg_addr,
|
reg_addr: tuple,
|
||||||
spawn_backend,
|
spawn_backend: str,
|
||||||
loglevel,
|
loglevel: str,
|
||||||
):
|
):
|
||||||
"""Verify that ``._portal.ReceiveStream.shield()``
|
'''
|
||||||
|
Verify that ``._portal.ReceiveStream.shield()``
|
||||||
sucessfully protects the underlying IPC channel from being closed
|
sucessfully protects the underlying IPC channel from being closed
|
||||||
when cancelling and respawning a consumer task.
|
when cancelling and respawning a consumer task.
|
||||||
|
|
||||||
This also serves to verify that all values from the stream can be
|
This also serves to verify that all values from the stream can be
|
||||||
received despite the respawns.
|
received despite the respawns.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
stream = None
|
stream = None
|
||||||
|
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
portal = await n.start_actor(
|
portal = await an.start_actor(
|
||||||
name='streamer',
|
name='streamer',
|
||||||
enable_modules=[__name__]
|
enable_modules=[__name__]
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue