diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 4eb06e8..e0ffa87 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -8,6 +8,7 @@ sync-opening a ``tractor.Context`` beforehand. # from contextlib import asynccontextmanager as acm from itertools import count import platform +from pprint import pformat from typing import ( Callable, ) @@ -815,7 +816,10 @@ async def echo_back_sequence( # NOTE: ensure that if the caller is expecting to cancel this task # that we stay echoing much longer then they are so we don't # return early instead of receive the cancel msg. - total_batches: int = 1000 if wait_for_cancel else 6 + total_batches: int = ( + 1000 if wait_for_cancel + else 6 + ) await ctx.started() # await tractor.breakpoint() @@ -834,8 +838,23 @@ async def echo_back_sequence( ) seq = list(seq) # bleh, msgpack sometimes ain't decoded right - for _ in range(total_batches): + for i in range(total_batches): + print(f'starting new stream batch {i} iter in child') batch = [] + + # EoC case, delay a little instead of hot + # iter-stopping (since apparently py3.11+ can do that + # faster then a ctxc can be sent) on the async for + # loop when child was requested to ctxc. + if ( + stream.closed + or + ctx.cancel_called + ): + print('child stream already closed!?!') + await trio.sleep(0.05) + continue + async for msg in stream: batch.append(msg) if batch == seq: @@ -846,15 +865,18 @@ async def echo_back_sequence( print('callee waiting on next') + print(f'callee echoing back latest batch\n{batch}') for msg in batch: - print(f'callee sending {msg}') + print(f'callee sending msg\n{msg}') await stream.send(msg) - print( - 'EXITING CALLEEE:\n' - f'{ctx.canceller}' - ) - return 'yo' + try: + return 'yo' + finally: + print( + 'exiting callee with context:\n' + f'{pformat(ctx)}\n' + ) @pytest.mark.parametrize( @@ -916,8 +938,8 @@ def test_maybe_allow_overruns_stream( wait_for_cancel=cancel_ctx, be_slow=(slow_side == 'child'), allow_overruns_side=allow_overruns_side, - ) as (ctx, sent): + ) as (ctx, sent): assert sent is None async with ctx.open_stream( @@ -945,10 +967,10 @@ def test_maybe_allow_overruns_stream( if cancel_ctx: # cancel the remote task - print('sending root side cancel') + print('Requesting `ctx.cancel()` in parent!') await ctx.cancel() - res = await ctx.result() + res: str|ContextCancelled = await ctx.result() if cancel_ctx: assert isinstance(res, ContextCancelled)