Add timeouts around some context test bodies

Since with my in-index runtime-port to our native msg-spec it seems
these ones are hanging B(

- `test_one_end_stream_not_opened()`
- `test_maybe_allow_overruns_stream()`

Tossing in some `trio.fail_after()`s seems to at least gnab them as
failures B)
msg_codecs
Tyler Goodlet 2024-04-02 13:33:06 -04:00
parent 3aa964315a
commit f2ce4a3469
1 changed files with 74 additions and 60 deletions

View File

@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
''' '''
from itertools import count from itertools import count
import math
import platform import platform
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
@ -845,7 +846,10 @@ async def keep_sending_from_callee(
('caller', 1, never_open_stream), ('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_callee), ('callee', 0, keep_sending_from_callee),
], ],
ids='overrun_condition={}'.format, ids=[
('caller_1buf_never_open_stream'),
('callee_0buf_keep_sending_from_callee'),
]
) )
def test_one_end_stream_not_opened( def test_one_end_stream_not_opened(
overrun_by: tuple[str, int, Callable], overrun_by: tuple[str, int, Callable],
@ -869,29 +873,30 @@ def test_one_end_stream_not_opened(
enable_modules=[__name__], enable_modules=[__name__],
) )
async with portal.open_context( with trio.fail_after(0.8):
entrypoint, async with portal.open_context(
) as (ctx, sent): entrypoint,
assert sent is None ) as (ctx, sent):
assert sent is None
if 'caller' in overrunner: if 'caller' in overrunner:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size # itersend +1 msg more then the buffer size
# to cause the most basic overrun. # to cause the most basic overrun.
for i in range(buf_size): for i in range(buf_size):
print(f'sending {i}') print(f'sending {i}')
await stream.send(i) await stream.send(i)
else: else:
# expect overrun error to be relayed back # expect overrun error to be relayed back
# and this sleep interrupted # and this sleep interrupted
await trio.sleep_forever() await trio.sleep_forever()
else: else:
# callee overruns caller case so we do nothing here # callee overruns caller case so we do nothing here
await trio.sleep_forever() await trio.sleep_forever()
await portal.cancel_actor() await portal.cancel_actor()
@ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
) )
seq = list(range(10))
async with portal.open_context(
echo_back_sequence,
seq=seq,
wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side,
) as (ctx, sent): # stream-sequence batch info with send delay to determine
assert sent is None # approx timeout determining whether test has hung.
total_batches: int = 2
num_items: int = 10
seq = list(range(num_items))
parent_send_delay: float = 0.16
timeout: float = math.ceil(
total_batches * num_items * parent_send_delay
)
with trio.fail_after(timeout):
async with portal.open_context(
echo_back_sequence,
seq=seq,
wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side,
async with ctx.open_stream( ) as (ctx, sent):
msg_buffer_size=1 if slow_side == 'parent' else None, assert sent is None
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
) as stream:
total_batches: int = 2 async with ctx.open_stream(
for _ in range(total_batches): msg_buffer_size=1 if slow_side == 'parent' else None,
for msg in seq: allow_overruns=(allow_overruns_side in {'parent', 'both'}),
# print(f'root tx {msg}') ) as stream:
await stream.send(msg)
if slow_side == 'parent':
# NOTE: we make the parent slightly
# slower, when it is slow, to make sure
# that in the overruns everywhere case
await trio.sleep(0.16)
batch = [] for _ in range(total_batches):
async for msg in stream: for msg in seq:
print(f'root rx {msg}') # print(f'root tx {msg}')
batch.append(msg) await stream.send(msg)
if batch == seq: if slow_side == 'parent':
break # NOTE: we make the parent slightly
# slower, when it is slow, to make sure
# that in the overruns everywhere case
await trio.sleep(parent_send_delay)
batch = []
async for msg in stream:
print(f'root rx {msg}')
batch.append(msg)
if batch == seq:
break
if cancel_ctx:
# cancel the remote task
print('Requesting `ctx.cancel()` in parent!')
await ctx.cancel()
res: str|ContextCancelled = await ctx.result()
if cancel_ctx: if cancel_ctx:
# cancel the remote task assert isinstance(res, ContextCancelled)
print('Requesting `ctx.cancel()` in parent!') assert tuple(res.canceller) == current_actor().uid
await ctx.cancel()
res: str|ContextCancelled = await ctx.result() else:
print(f'RX ROOT SIDE RESULT {res}')
if cancel_ctx: assert res == 'yo'
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')
assert res == 'yo'
# cancel the daemon # cancel the daemon
await portal.cancel_actor() await portal.cancel_actor()