Use `collapse_eg()` in broadcaster suite
Around the test embedded `trio.open_nursery()` calls as expected. Also tidy up the various nursery var names.py313_support
parent
5d1eab0025
commit
6ca15c8b4d
|
@ -2,7 +2,9 @@
|
||||||
Broadcast channels for fan-out to local tasks.
|
Broadcast channels for fan-out to local tasks.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import cycle
|
from itertools import cycle
|
||||||
import time
|
import time
|
||||||
|
@ -15,6 +17,7 @@ import tractor
|
||||||
from tractor.trionics import (
|
from tractor.trionics import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
Lagged,
|
Lagged,
|
||||||
|
collapse_eg,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -62,7 +65,7 @@ async def ensure_sequence(
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def open_sequence_streamer(
|
async def open_sequence_streamer(
|
||||||
|
|
||||||
sequence: list[int],
|
sequence: list[int],
|
||||||
|
@ -74,9 +77,9 @@ async def open_sequence_streamer(
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
arbiter_addr=reg_addr,
|
arbiter_addr=reg_addr,
|
||||||
start_method=start_method,
|
start_method=start_method,
|
||||||
) as tn:
|
) as an:
|
||||||
|
|
||||||
portal = await tn.start_actor(
|
portal = await an.start_actor(
|
||||||
'sequence_echoer',
|
'sequence_echoer',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -155,9 +158,12 @@ def test_consumer_and_parent_maybe_lag(
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as n:
|
async with (
|
||||||
|
collapse_eg(),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
|
||||||
n.start_soon(
|
tn.start_soon(
|
||||||
ensure_sequence,
|
ensure_sequence,
|
||||||
stream,
|
stream,
|
||||||
sequence.copy(),
|
sequence.copy(),
|
||||||
|
@ -230,8 +236,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
||||||
|
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as tn:
|
||||||
n.start_soon(
|
tn.start_soon(
|
||||||
ensure_sequence,
|
ensure_sequence,
|
||||||
stream,
|
stream,
|
||||||
sequence.copy(),
|
sequence.copy(),
|
||||||
|
@ -253,7 +259,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
print('cancelling faster subtask')
|
print('cancelling faster subtask')
|
||||||
n.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
value = await stream.receive()
|
value = await stream.receive()
|
||||||
|
@ -371,13 +377,13 @@ def test_ensure_slow_consumers_lag_out(
|
||||||
f'on {lags}:{value}')
|
f'on {lags}:{value}')
|
||||||
return
|
return
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as tn:
|
||||||
|
|
||||||
for i in range(1, num_laggers):
|
for i in range(1, num_laggers):
|
||||||
|
|
||||||
task_name = f'sub_{i}'
|
task_name = f'sub_{i}'
|
||||||
laggers[task_name] = 0
|
laggers[task_name] = 0
|
||||||
nursery.start_soon(
|
tn.start_soon(
|
||||||
partial(
|
partial(
|
||||||
sub_and_print,
|
sub_and_print,
|
||||||
delay=i*0.001,
|
delay=i*0.001,
|
||||||
|
@ -497,6 +503,7 @@ def test_no_raise_on_lag():
|
||||||
# internals when the no raise flag is set.
|
# internals when the no raise flag is set.
|
||||||
loglevel='warning',
|
loglevel='warning',
|
||||||
),
|
),
|
||||||
|
collapse_eg(),
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
n.start_soon(slow)
|
n.start_soon(slow)
|
||||||
|
|
|
@ -20,7 +20,6 @@ first-class-`trio` from a historical perspective B)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
# bontextmanager as cm,
|
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue