From efb8bec8285025c6e6fe600745b3237b6b67d54c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Jan 2023 12:26:07 -0500 Subject: [PATCH] Add a basic no-raise-on lag test --- tests/test_task_broadcasting.py | 56 +++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 636f92b..9f4a1fe 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -12,7 +12,10 @@ import pytest import trio from trio.lowlevel import current_task import tractor -from tractor.trionics import broadcast_receiver, Lagged +from tractor.trionics import ( + broadcast_receiver, + Lagged, +) @tractor.context @@ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower( arb_addr, start_method, ): - '''Ensure that if a faster task consuming from a stream is cancelled + ''' + Ensure that if a faster task consuming from a stream is cancelled the slower task can continue to receive all expected values. ''' @@ -460,3 +464,51 @@ def test_first_recver_is_cancelled(): assert value == 1 trio.run(main) + + +def test_no_raise_on_lag(): + ''' + Run a simple 2-task broadcast where one task is slow but configured + so that it does not raise `Lagged` on overruns using + `raise_on_lasg=False` and verify that the task does not raise. + + ''' + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + + async def slow(): + async with brx.subscribe( + raise_on_lag=False, + ) as br: + async for msg in br: + print(f'slow task got: {msg}') + await trio.sleep(0.1) + + async def fast(): + async with brx.subscribe() as br: + async for msg in br: + print(f'fast task got: {msg}') + + async def main(): + async with ( + tractor.open_root_actor( + # NOTE: so we see the warning msg emitted by the bcaster + # internals when the no raise flag is set. + loglevel='warning', + ), + trio.open_nursery() as n, + ): + n.start_soon(slow) + n.start_soon(fast) + + for i in range(1000): + await tx.send(i) + + # simulate user nailing ctl-c after realizing + # there's a lag in the slow task. + await trio.sleep(1) + raise KeyboardInterrupt + + with pytest.raises(KeyboardInterrupt): + trio.run(main)