forked from goodboy/tractor
Add a basic no-raise-on lag test
parent
8637778739
commit
efb8bec828
|
@ -12,7 +12,10 @@ import pytest
|
||||||
import trio
|
import trio
|
||||||
from trio.lowlevel import current_task
|
from trio.lowlevel import current_task
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import broadcast_receiver, Lagged
|
from tractor.trionics import (
|
||||||
|
broadcast_receiver,
|
||||||
|
Lagged,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
||||||
arb_addr,
|
arb_addr,
|
||||||
start_method,
|
start_method,
|
||||||
):
|
):
|
||||||
'''Ensure that if a faster task consuming from a stream is cancelled
|
'''
|
||||||
|
Ensure that if a faster task consuming from a stream is cancelled
|
||||||
the slower task can continue to receive all expected values.
|
the slower task can continue to receive all expected values.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
@ -460,3 +464,51 @@ def test_first_recver_is_cancelled():
|
||||||
assert value == 1
|
assert value == 1
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_raise_on_lag():
|
||||||
|
'''
|
||||||
|
Run a simple 2-task broadcast where one task is slow but configured
|
||||||
|
so that it does not raise `Lagged` on overruns using
|
||||||
|
`raise_on_lasg=False` and verify that the task does not raise.
|
||||||
|
|
||||||
|
'''
|
||||||
|
size = 100
|
||||||
|
tx, rx = trio.open_memory_channel(size)
|
||||||
|
brx = broadcast_receiver(rx, size)
|
||||||
|
|
||||||
|
async def slow():
|
||||||
|
async with brx.subscribe(
|
||||||
|
raise_on_lag=False,
|
||||||
|
) as br:
|
||||||
|
async for msg in br:
|
||||||
|
print(f'slow task got: {msg}')
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
async def fast():
|
||||||
|
async with brx.subscribe() as br:
|
||||||
|
async for msg in br:
|
||||||
|
print(f'fast task got: {msg}')
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
# NOTE: so we see the warning msg emitted by the bcaster
|
||||||
|
# internals when the no raise flag is set.
|
||||||
|
loglevel='warning',
|
||||||
|
),
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
):
|
||||||
|
n.start_soon(slow)
|
||||||
|
n.start_soon(fast)
|
||||||
|
|
||||||
|
for i in range(1000):
|
||||||
|
await tx.send(i)
|
||||||
|
|
||||||
|
# simulate user nailing ctl-c after realizing
|
||||||
|
# there's a lag in the slow task.
|
||||||
|
await trio.sleep(1)
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
with pytest.raises(KeyboardInterrupt):
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue