diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 55f6d3f..8265197 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -416,3 +416,44 @@ def test_ensure_slow_consumers_lag_out( await brx.aclose() trio.run(main) + + +def test_first_recver_is_cancelled(): + + async def main(): + + # make sure it all works within the runtime + async with tractor.open_root_actor(): + + tx, rx = trio.open_memory_channel(1) + brx = broadcast_receiver(rx, 1) + cs = trio.CancelScope() + sequence = list(range(3)) + + async def sub_and_recv(): + with cs: + async with brx.subscribe() as bc: + async for value in bc: + print(value) + + async def cancel_and_send(): + await trio.sleep(0.2) + cs.cancel() + await tx.send(1) + + async with trio.open_nursery() as n: + + n.start_soon(sub_and_recv) + await trio.sleep(0.1) + assert brx._state.recv_ready + + n.start_soon(cancel_and_send) + + # ensure that we don't hang because no-task is now + # waiting on the underlying receive.. + with trio.fail_after(0.5): + value = await brx.receive() + print(f'parent: {value}') + assert value == 1 + + trio.run(main)