From 29e0b8f67db1d9db91751216d3570e6c8cb13764 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Aug 2021 13:04:51 -0400 Subject: [PATCH] Add subscribe after close test --- tests/test_local_task_broadcast.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/test_local_task_broadcast.py b/tests/test_local_task_broadcast.py index 3967016..fe30e33 100644 --- a/tests/test_local_task_broadcast.py +++ b/tests/test_local_task_broadcast.py @@ -100,6 +100,29 @@ def test_stream_fan_out_to_local_subscriptions( trio.run(main) +def test_subscribe_errors_after_close(): + + async def main(): + + size = 1 + tx, rx = trio.open_memory_channel(size) + async with broadcast_receiver(rx, size) as brx: + pass + + try: + # open and close + async with brx.subscribe(): + pass + + except trio.ClosedResourceError: + assert brx.key not in brx._state.subs + + else: + assert 0 + + trio.run(main) + + def test_ensure_slow_consumers_lag_out( arb_addr, start_method, @@ -129,7 +152,6 @@ def test_ensure_slow_consumers_lag_out( async with brx.subscribe() as lbrx: while True: - # await tractor.breakpoint() print(f'{task.name}: starting consume loop') try: async for value in lbrx: @@ -156,7 +178,7 @@ def test_ensure_slow_consumers_lag_out( lag_time = time.time() - start lags = laggers[task.name] print( - f'restarting slow ass {task.name} ' + f'restarting slow task {task.name} ' f'that bailed out on {lags}:{value} ' f'after {lag_time:.3f}') if lags <= retries: @@ -207,7 +229,8 @@ def test_ensure_slow_consumers_lag_out( await brx.receive() except Lagged: # expect tokio style index truncation - assert brx._state.subs[brx.key] == len(brx._state.queue) - 1 + seq = brx._state.subs[brx.key] + assert seq == len(brx._state.queue) - 1 # all backpressured entries in the underlying # channel should have been copied into the caster