forked from goodboy/tractor
Add timeouts around some context test bodies
Since with my in-index runtime-port to our native msg-spec it seems these ones are hanging B( - `test_one_end_stream_not_opened()` - `test_maybe_allow_overruns_stream()` Tossing in some `trio.fail_after()`s seems to at least gnab them as failures B)msg_codecs
parent
3aa964315a
commit
f2ce4a3469
|
@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
|
|||
|
||||
'''
|
||||
from itertools import count
|
||||
import math
|
||||
import platform
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
|
@ -845,7 +846,10 @@ async def keep_sending_from_callee(
|
|||
('caller', 1, never_open_stream),
|
||||
('callee', 0, keep_sending_from_callee),
|
||||
],
|
||||
ids='overrun_condition={}'.format,
|
||||
ids=[
|
||||
('caller_1buf_never_open_stream'),
|
||||
('callee_0buf_keep_sending_from_callee'),
|
||||
]
|
||||
)
|
||||
def test_one_end_stream_not_opened(
|
||||
overrun_by: tuple[str, int, Callable],
|
||||
|
@ -869,29 +873,30 @@ def test_one_end_stream_not_opened(
|
|||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with portal.open_context(
|
||||
entrypoint,
|
||||
) as (ctx, sent):
|
||||
assert sent is None
|
||||
with trio.fail_after(0.8):
|
||||
async with portal.open_context(
|
||||
entrypoint,
|
||||
) as (ctx, sent):
|
||||
assert sent is None
|
||||
|
||||
if 'caller' in overrunner:
|
||||
if 'caller' in overrunner:
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
# itersend +1 msg more then the buffer size
|
||||
# to cause the most basic overrun.
|
||||
for i in range(buf_size):
|
||||
print(f'sending {i}')
|
||||
await stream.send(i)
|
||||
# itersend +1 msg more then the buffer size
|
||||
# to cause the most basic overrun.
|
||||
for i in range(buf_size):
|
||||
print(f'sending {i}')
|
||||
await stream.send(i)
|
||||
|
||||
else:
|
||||
# expect overrun error to be relayed back
|
||||
# and this sleep interrupted
|
||||
await trio.sleep_forever()
|
||||
else:
|
||||
# expect overrun error to be relayed back
|
||||
# and this sleep interrupted
|
||||
await trio.sleep_forever()
|
||||
|
||||
else:
|
||||
# callee overruns caller case so we do nothing here
|
||||
await trio.sleep_forever()
|
||||
else:
|
||||
# callee overruns caller case so we do nothing here
|
||||
await trio.sleep_forever()
|
||||
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
@ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
|
|||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
)
|
||||
seq = list(range(10))
|
||||
async with portal.open_context(
|
||||
echo_back_sequence,
|
||||
seq=seq,
|
||||
wait_for_cancel=cancel_ctx,
|
||||
be_slow=(slow_side == 'child'),
|
||||
allow_overruns_side=allow_overruns_side,
|
||||
|
||||
) as (ctx, sent):
|
||||
assert sent is None
|
||||
# stream-sequence batch info with send delay to determine
|
||||
# approx timeout determining whether test has hung.
|
||||
total_batches: int = 2
|
||||
num_items: int = 10
|
||||
seq = list(range(num_items))
|
||||
parent_send_delay: float = 0.16
|
||||
timeout: float = math.ceil(
|
||||
total_batches * num_items * parent_send_delay
|
||||
)
|
||||
with trio.fail_after(timeout):
|
||||
async with portal.open_context(
|
||||
echo_back_sequence,
|
||||
seq=seq,
|
||||
wait_for_cancel=cancel_ctx,
|
||||
be_slow=(slow_side == 'child'),
|
||||
allow_overruns_side=allow_overruns_side,
|
||||
|
||||
async with ctx.open_stream(
|
||||
msg_buffer_size=1 if slow_side == 'parent' else None,
|
||||
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
||||
) as stream:
|
||||
) as (ctx, sent):
|
||||
assert sent is None
|
||||
|
||||
total_batches: int = 2
|
||||
for _ in range(total_batches):
|
||||
for msg in seq:
|
||||
# print(f'root tx {msg}')
|
||||
await stream.send(msg)
|
||||
if slow_side == 'parent':
|
||||
# NOTE: we make the parent slightly
|
||||
# slower, when it is slow, to make sure
|
||||
# that in the overruns everywhere case
|
||||
await trio.sleep(0.16)
|
||||
async with ctx.open_stream(
|
||||
msg_buffer_size=1 if slow_side == 'parent' else None,
|
||||
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
||||
) as stream:
|
||||
|
||||
batch = []
|
||||
async for msg in stream:
|
||||
print(f'root rx {msg}')
|
||||
batch.append(msg)
|
||||
if batch == seq:
|
||||
break
|
||||
for _ in range(total_batches):
|
||||
for msg in seq:
|
||||
# print(f'root tx {msg}')
|
||||
await stream.send(msg)
|
||||
if slow_side == 'parent':
|
||||
# NOTE: we make the parent slightly
|
||||
# slower, when it is slow, to make sure
|
||||
# that in the overruns everywhere case
|
||||
await trio.sleep(parent_send_delay)
|
||||
|
||||
batch = []
|
||||
async for msg in stream:
|
||||
print(f'root rx {msg}')
|
||||
batch.append(msg)
|
||||
if batch == seq:
|
||||
break
|
||||
|
||||
if cancel_ctx:
|
||||
# cancel the remote task
|
||||
print('Requesting `ctx.cancel()` in parent!')
|
||||
await ctx.cancel()
|
||||
|
||||
res: str|ContextCancelled = await ctx.result()
|
||||
|
||||
if cancel_ctx:
|
||||
# cancel the remote task
|
||||
print('Requesting `ctx.cancel()` in parent!')
|
||||
await ctx.cancel()
|
||||
assert isinstance(res, ContextCancelled)
|
||||
assert tuple(res.canceller) == current_actor().uid
|
||||
|
||||
res: str|ContextCancelled = await ctx.result()
|
||||
|
||||
if cancel_ctx:
|
||||
assert isinstance(res, ContextCancelled)
|
||||
assert tuple(res.canceller) == current_actor().uid
|
||||
|
||||
else:
|
||||
print(f'RX ROOT SIDE RESULT {res}')
|
||||
assert res == 'yo'
|
||||
else:
|
||||
print(f'RX ROOT SIDE RESULT {res}')
|
||||
assert res == 'yo'
|
||||
|
||||
# cancel the daemon
|
||||
await portal.cancel_actor()
|
||||
|
|
Loading…
Reference in New Issue