forked from goodboy/tractor
1
0
Fork 0

Fix overruns test to avoid return-beats-ctxc race

Turns out that py3.11 might be so fast that iterating a EoC-ed
`MsgStream` 1k times is faster then a `Context.cancel()` msg
transmission from a parent actor to it's child (which i guess makes
sense). So tweak the test to delay 5ms between stream async-for iteration
attempts when the stream is detected to be `.closed: bool` (coming in
patch) or `ctx.cancel_called == true`.
modden_spawn_from_client_req
Tyler Goodlet 2024-02-21 13:24:33 -05:00
parent 28ba5e5435
commit ce1bcf6d36
1 changed files with 33 additions and 11 deletions

View File

@ -8,6 +8,7 @@ sync-opening a ``tractor.Context`` beforehand.
# from contextlib import asynccontextmanager as acm # from contextlib import asynccontextmanager as acm
from itertools import count from itertools import count
import platform import platform
from pprint import pformat
from typing import ( from typing import (
Callable, Callable,
) )
@ -815,7 +816,10 @@ async def echo_back_sequence(
# NOTE: ensure that if the caller is expecting to cancel this task # NOTE: ensure that if the caller is expecting to cancel this task
# that we stay echoing much longer then they are so we don't # that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg. # return early instead of receive the cancel msg.
total_batches: int = 1000 if wait_for_cancel else 6 total_batches: int = (
1000 if wait_for_cancel
else 6
)
await ctx.started() await ctx.started()
# await tractor.breakpoint() # await tractor.breakpoint()
@ -834,8 +838,23 @@ async def echo_back_sequence(
) )
seq = list(seq) # bleh, msgpack sometimes ain't decoded right seq = list(seq) # bleh, msgpack sometimes ain't decoded right
for _ in range(total_batches): for i in range(total_batches):
print(f'starting new stream batch {i} iter in child')
batch = [] batch = []
# EoC case, delay a little instead of hot
# iter-stopping (since apparently py3.11+ can do that
# faster then a ctxc can be sent) on the async for
# loop when child was requested to ctxc.
if (
stream.closed
or
ctx.cancel_called
):
print('child stream already closed!?!')
await trio.sleep(0.05)
continue
async for msg in stream: async for msg in stream:
batch.append(msg) batch.append(msg)
if batch == seq: if batch == seq:
@ -846,15 +865,18 @@ async def echo_back_sequence(
print('callee waiting on next') print('callee waiting on next')
print(f'callee echoing back latest batch\n{batch}')
for msg in batch: for msg in batch:
print(f'callee sending {msg}') print(f'callee sending msg\n{msg}')
await stream.send(msg) await stream.send(msg)
print( try:
'EXITING CALLEEE:\n'
f'{ctx.canceller}'
)
return 'yo' return 'yo'
finally:
print(
'exiting callee with context:\n'
f'{pformat(ctx)}\n'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -916,8 +938,8 @@ def test_maybe_allow_overruns_stream(
wait_for_cancel=cancel_ctx, wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'), be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side, allow_overruns_side=allow_overruns_side,
) as (ctx, sent):
) as (ctx, sent):
assert sent is None assert sent is None
async with ctx.open_stream( async with ctx.open_stream(
@ -945,10 +967,10 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx: if cancel_ctx:
# cancel the remote task # cancel the remote task
print('sending root side cancel') print('Requesting `ctx.cancel()` in parent!')
await ctx.cancel() await ctx.cancel()
res = await ctx.result() res: str|ContextCancelled = await ctx.result()
if cancel_ctx: if cancel_ctx:
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)