Add a first receiver is cancelled test
parent
b7b489dd07
commit
5881a82d2a
|
@ -416,3 +416,44 @@ def test_ensure_slow_consumers_lag_out(
|
||||||
await brx.aclose()
|
await brx.aclose()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
def test_first_recver_is_cancelled():
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
# make sure it all works within the runtime
|
||||||
|
async with tractor.open_root_actor():
|
||||||
|
|
||||||
|
tx, rx = trio.open_memory_channel(1)
|
||||||
|
brx = broadcast_receiver(rx, 1)
|
||||||
|
cs = trio.CancelScope()
|
||||||
|
sequence = list(range(3))
|
||||||
|
|
||||||
|
async def sub_and_recv():
|
||||||
|
with cs:
|
||||||
|
async with brx.subscribe() as bc:
|
||||||
|
async for value in bc:
|
||||||
|
print(value)
|
||||||
|
|
||||||
|
async def cancel_and_send():
|
||||||
|
await trio.sleep(0.2)
|
||||||
|
cs.cancel()
|
||||||
|
await tx.send(1)
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
|
n.start_soon(sub_and_recv)
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
assert brx._state.recv_ready
|
||||||
|
|
||||||
|
n.start_soon(cancel_and_send)
|
||||||
|
|
||||||
|
# ensure that we don't hang because no-task is now
|
||||||
|
# waiting on the underlying receive..
|
||||||
|
with trio.fail_after(0.5):
|
||||||
|
value = await brx.receive()
|
||||||
|
print(f'parent: {value}')
|
||||||
|
assert value == 1
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue