Add subscribe after close test

live_on_air_from_tokio
Tyler Goodlet 2021-08-20 13:04:51 -04:00
parent bec3f5999d
commit d7ad8982ff
1 changed files with 26 additions and 3 deletions

View File

@ -100,6 +100,29 @@ def test_stream_fan_out_to_local_subscriptions(
trio.run(main) trio.run(main)
def test_subscribe_errors_after_close():
async def main():
size = 1
tx, rx = trio.open_memory_channel(size)
async with broadcast_receiver(rx, size) as brx:
pass
try:
# open and close
async with brx.subscribe():
pass
except trio.ClosedResourceError:
assert brx.key not in brx._state.subs
else:
assert 0
trio.run(main)
def test_ensure_slow_consumers_lag_out( def test_ensure_slow_consumers_lag_out(
arb_addr, arb_addr,
start_method, start_method,
@ -129,7 +152,6 @@ def test_ensure_slow_consumers_lag_out(
async with brx.subscribe() as lbrx: async with brx.subscribe() as lbrx:
while True: while True:
# await tractor.breakpoint()
print(f'{task.name}: starting consume loop') print(f'{task.name}: starting consume loop')
try: try:
async for value in lbrx: async for value in lbrx:
@ -156,7 +178,7 @@ def test_ensure_slow_consumers_lag_out(
lag_time = time.time() - start lag_time = time.time() - start
lags = laggers[task.name] lags = laggers[task.name]
print( print(
f'restarting slow ass {task.name} ' f'restarting slow task {task.name} '
f'that bailed out on {lags}:{value} ' f'that bailed out on {lags}:{value} '
f'after {lag_time:.3f}') f'after {lag_time:.3f}')
if lags <= retries: if lags <= retries:
@ -207,7 +229,8 @@ def test_ensure_slow_consumers_lag_out(
await brx.receive() await brx.receive()
except Lagged: except Lagged:
# expect tokio style index truncation # expect tokio style index truncation
assert brx._state.subs[brx.key] == len(brx._state.queue) - 1 seq = brx._state.subs[brx.key]
assert seq == len(brx._state.queue) - 1
# all backpressured entries in the underlying # all backpressured entries in the underlying
# channel should have been copied into the caster # channel should have been copied into the caster