diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 1f5e3db..cc7f402 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand. ''' from itertools import count +import math import platform from pprint import pformat from typing import ( @@ -845,7 +846,10 @@ async def keep_sending_from_callee( ('caller', 1, never_open_stream), ('callee', 0, keep_sending_from_callee), ], - ids='overrun_condition={}'.format, + ids=[ + ('caller_1buf_never_open_stream'), + ('callee_0buf_keep_sending_from_callee'), + ] ) def test_one_end_stream_not_opened( overrun_by: tuple[str, int, Callable], @@ -869,29 +873,30 @@ def test_one_end_stream_not_opened( enable_modules=[__name__], ) - async with portal.open_context( - entrypoint, - ) as (ctx, sent): - assert sent is None + with trio.fail_after(0.8): + async with portal.open_context( + entrypoint, + ) as (ctx, sent): + assert sent is None - if 'caller' in overrunner: + if 'caller' in overrunner: - async with ctx.open_stream() as stream: + async with ctx.open_stream() as stream: - # itersend +1 msg more then the buffer size - # to cause the most basic overrun. - for i in range(buf_size): - print(f'sending {i}') - await stream.send(i) + # itersend +1 msg more then the buffer size + # to cause the most basic overrun. + for i in range(buf_size): + print(f'sending {i}') + await stream.send(i) - else: - # expect overrun error to be relayed back - # and this sleep interrupted - await trio.sleep_forever() + else: + # expect overrun error to be relayed back + # and this sleep interrupted + await trio.sleep_forever() - else: - # callee overruns caller case so we do nothing here - await trio.sleep_forever() + else: + # callee overruns caller case so we do nothing here + await trio.sleep_forever() await portal.cancel_actor() @@ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream( loglevel=loglevel, debug_mode=debug_mode, ) - seq = list(range(10)) - async with portal.open_context( - echo_back_sequence, - seq=seq, - wait_for_cancel=cancel_ctx, - be_slow=(slow_side == 'child'), - allow_overruns_side=allow_overruns_side, - ) as (ctx, sent): - assert sent is None + # stream-sequence batch info with send delay to determine + # approx timeout determining whether test has hung. + total_batches: int = 2 + num_items: int = 10 + seq = list(range(num_items)) + parent_send_delay: float = 0.16 + timeout: float = math.ceil( + total_batches * num_items * parent_send_delay + ) + with trio.fail_after(timeout): + async with portal.open_context( + echo_back_sequence, + seq=seq, + wait_for_cancel=cancel_ctx, + be_slow=(slow_side == 'child'), + allow_overruns_side=allow_overruns_side, - async with ctx.open_stream( - msg_buffer_size=1 if slow_side == 'parent' else None, - allow_overruns=(allow_overruns_side in {'parent', 'both'}), - ) as stream: + ) as (ctx, sent): + assert sent is None - total_batches: int = 2 - for _ in range(total_batches): - for msg in seq: - # print(f'root tx {msg}') - await stream.send(msg) - if slow_side == 'parent': - # NOTE: we make the parent slightly - # slower, when it is slow, to make sure - # that in the overruns everywhere case - await trio.sleep(0.16) + async with ctx.open_stream( + msg_buffer_size=1 if slow_side == 'parent' else None, + allow_overruns=(allow_overruns_side in {'parent', 'both'}), + ) as stream: - batch = [] - async for msg in stream: - print(f'root rx {msg}') - batch.append(msg) - if batch == seq: - break + for _ in range(total_batches): + for msg in seq: + # print(f'root tx {msg}') + await stream.send(msg) + if slow_side == 'parent': + # NOTE: we make the parent slightly + # slower, when it is slow, to make sure + # that in the overruns everywhere case + await trio.sleep(parent_send_delay) + + batch = [] + async for msg in stream: + print(f'root rx {msg}') + batch.append(msg) + if batch == seq: + break + + if cancel_ctx: + # cancel the remote task + print('Requesting `ctx.cancel()` in parent!') + await ctx.cancel() + + res: str|ContextCancelled = await ctx.result() if cancel_ctx: - # cancel the remote task - print('Requesting `ctx.cancel()` in parent!') - await ctx.cancel() + assert isinstance(res, ContextCancelled) + assert tuple(res.canceller) == current_actor().uid - res: str|ContextCancelled = await ctx.result() - - if cancel_ctx: - assert isinstance(res, ContextCancelled) - assert tuple(res.canceller) == current_actor().uid - - else: - print(f'RX ROOT SIDE RESULT {res}') - assert res == 'yo' + else: + print(f'RX ROOT SIDE RESULT {res}') + assert res == 'yo' # cancel the daemon await portal.cancel_actor()