forked from goodboy/tractor
Seriously cover all overrun cases
This actually caught further runtime bugs so it's gud i tried.. Add overrun-ignore enabled / disabled cases and error catching for all of them. More or less this should cover every possible outcome when it comes to setting `allow_overruns: bool` i hope XDproper_breakpoint_hooking
parent
8bd4db150b
commit
1183276653
|
@ -722,7 +722,9 @@ async def echo_back_sequence(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
seq: list[int],
|
seq: list[int],
|
||||||
wait_for_cancel: bool,
|
wait_for_cancel: bool,
|
||||||
msg_buffer_size: int | None = None,
|
allow_overruns_side: str,
|
||||||
|
be_slow: bool = False,
|
||||||
|
msg_buffer_size: int = 1,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -737,12 +739,22 @@ async def echo_back_sequence(
|
||||||
total_batches: int = 1000 if wait_for_cancel else 6
|
total_batches: int = 1000 if wait_for_cancel else 6
|
||||||
|
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
# await tractor.breakpoint()
|
||||||
async with ctx.open_stream(
|
async with ctx.open_stream(
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=True,
|
|
||||||
|
# literally the point of this test XD
|
||||||
|
allow_overruns=(allow_overruns_side in {'child', 'both'}),
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
seq = list(seq) # bleh, `msgpack`...
|
# ensure mem chan settings are correct
|
||||||
|
assert (
|
||||||
|
ctx._send_chan._state.max_buffer_size
|
||||||
|
==
|
||||||
|
msg_buffer_size
|
||||||
|
)
|
||||||
|
|
||||||
|
seq = list(seq) # bleh, msgpack sometimes ain't decoded right
|
||||||
for _ in range(total_batches):
|
for _ in range(total_batches):
|
||||||
batch = []
|
batch = []
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
@ -750,6 +762,9 @@ async def echo_back_sequence(
|
||||||
if batch == seq:
|
if batch == seq:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if be_slow:
|
||||||
|
await trio.sleep(0.05)
|
||||||
|
|
||||||
print('callee waiting on next')
|
print('callee waiting on next')
|
||||||
|
|
||||||
for msg in batch:
|
for msg in batch:
|
||||||
|
@ -763,13 +778,29 @@ async def echo_back_sequence(
|
||||||
return 'yo'
|
return 'yo'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
# aka the side that will / should raise
|
||||||
|
# and overrun under normal conditions.
|
||||||
|
'allow_overruns_side',
|
||||||
|
['parent', 'child', 'none', 'both'],
|
||||||
|
ids=lambda item: f'allow_overruns_side={item}'
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
# aka the side that will / should raise
|
||||||
|
# and overrun under normal conditions.
|
||||||
|
'slow_side',
|
||||||
|
['parent', 'child'],
|
||||||
|
ids=lambda item: f'slow_side={item}'
|
||||||
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'cancel_ctx',
|
'cancel_ctx',
|
||||||
[True, False],
|
[True, False],
|
||||||
ids=lambda item: f'cancel_ctx={item}'
|
ids=lambda item: f'cancel_ctx={item}'
|
||||||
)
|
)
|
||||||
def test_allow_overruns_stream(
|
def test_maybe_allow_overruns_stream(
|
||||||
cancel_ctx: bool,
|
cancel_ctx: bool,
|
||||||
|
slow_side: str,
|
||||||
|
allow_overruns_side: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -794,26 +825,35 @@ def test_allow_overruns_stream(
|
||||||
'callee_sends_forever',
|
'callee_sends_forever',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
|
||||||
|
# debug_mode=True,
|
||||||
)
|
)
|
||||||
seq = list(range(3))
|
seq = list(range(10))
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
echo_back_sequence,
|
echo_back_sequence,
|
||||||
seq=seq,
|
seq=seq,
|
||||||
wait_for_cancel=cancel_ctx,
|
wait_for_cancel=cancel_ctx,
|
||||||
|
be_slow=(slow_side == 'child'),
|
||||||
|
allow_overruns_side=allow_overruns_side,
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
|
|
||||||
assert sent is None
|
assert sent is None
|
||||||
|
|
||||||
async with ctx.open_stream(
|
async with ctx.open_stream(
|
||||||
msg_buffer_size=1,
|
msg_buffer_size=1 if slow_side == 'parent' else None,
|
||||||
allow_overruns=True,
|
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
|
||||||
) as stream:
|
) as stream:
|
||||||
count = 0
|
|
||||||
while count < 3:
|
total_batches: int = 2
|
||||||
|
for _ in range(total_batches):
|
||||||
for msg in seq:
|
for msg in seq:
|
||||||
print(f'root tx {msg}')
|
# print(f'root tx {msg}')
|
||||||
await stream.send(msg)
|
await stream.send(msg)
|
||||||
await trio.sleep(0.1)
|
if slow_side == 'parent':
|
||||||
|
# NOTE: we make the parent slightly
|
||||||
|
# slower, when it is slow, to make sure
|
||||||
|
# that in the overruns everywhere case
|
||||||
|
await trio.sleep(0.16)
|
||||||
|
|
||||||
batch = []
|
batch = []
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
@ -822,8 +862,6 @@ def test_allow_overruns_stream(
|
||||||
if batch == seq:
|
if batch == seq:
|
||||||
break
|
break
|
||||||
|
|
||||||
count += 1
|
|
||||||
|
|
||||||
if cancel_ctx:
|
if cancel_ctx:
|
||||||
# cancel the remote task
|
# cancel the remote task
|
||||||
print('sending root side cancel')
|
print('sending root side cancel')
|
||||||
|
@ -842,7 +880,48 @@ def test_allow_overruns_stream(
|
||||||
# cancel the daemon
|
# cancel the daemon
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
trio.run(main)
|
if (
|
||||||
|
allow_overruns_side == 'both'
|
||||||
|
or slow_side == allow_overruns_side
|
||||||
|
):
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
elif (
|
||||||
|
slow_side != allow_overruns_side
|
||||||
|
):
|
||||||
|
|
||||||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
err = excinfo.value
|
||||||
|
|
||||||
|
if (
|
||||||
|
allow_overruns_side == 'none'
|
||||||
|
):
|
||||||
|
# depends on timing is is racy which side will
|
||||||
|
# overrun first :sadkitty:
|
||||||
|
|
||||||
|
# NOTE: i tried to isolate to a deterministic case here
|
||||||
|
# based on timeing, but i was kinda wasted, and i don't
|
||||||
|
# think it's sane to catch them..
|
||||||
|
assert err.type in (
|
||||||
|
tractor.RemoteActorError,
|
||||||
|
StreamOverrun,
|
||||||
|
)
|
||||||
|
|
||||||
|
elif (
|
||||||
|
slow_side == 'child'
|
||||||
|
):
|
||||||
|
assert err.type == StreamOverrun
|
||||||
|
|
||||||
|
elif slow_side == 'parent':
|
||||||
|
assert err.type == tractor.RemoteActorError
|
||||||
|
assert 'StreamOverrun' in err.msgdata['tb_str']
|
||||||
|
|
||||||
|
else:
|
||||||
|
# if this hits the logic blocks from above are not
|
||||||
|
# exhaustive..
|
||||||
|
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -426,6 +426,10 @@ class Context:
|
||||||
if remote_uid:
|
if remote_uid:
|
||||||
return tuple(remote_uid)
|
return tuple(remote_uid)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancelled_caught(self) -> bool:
|
||||||
|
return self._scope.cancelled_caught
|
||||||
|
|
||||||
# init and streaming state
|
# init and streaming state
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_started_received: bool = False
|
_started_received: bool = False
|
||||||
|
@ -743,7 +747,7 @@ class Context:
|
||||||
):
|
):
|
||||||
return err
|
return err
|
||||||
|
|
||||||
raise err from None
|
raise err # from None
|
||||||
|
|
||||||
async def result(self) -> Any | Exception:
|
async def result(self) -> Any | Exception:
|
||||||
'''
|
'''
|
||||||
|
|
Loading…
Reference in New Issue