244 lines
6.4 KiB
Python
244 lines
6.4 KiB
Python
|
'''
|
||
|
``async with ():`` inlined context-stream cancellation testing.
|
||
|
|
||
|
Verify the we raise errors when streams are opened prior to sync-opening
|
||
|
a ``tractor.Context`` beforehand.
|
||
|
|
||
|
'''
|
||
|
from itertools import count
|
||
|
from typing import Optional
|
||
|
|
||
|
import pytest
|
||
|
import trio
|
||
|
import tractor
|
||
|
from tractor._exceptions import StreamOverrun
|
||
|
|
||
|
|
||
|
@tractor.context
|
||
|
async def really_started(
|
||
|
ctx: tractor.Context,
|
||
|
) -> None:
|
||
|
await ctx.started()
|
||
|
try:
|
||
|
await ctx.started()
|
||
|
except RuntimeError:
|
||
|
raise
|
||
|
|
||
|
|
||
|
def test_started_called_more_then_once():
|
||
|
|
||
|
async def main():
|
||
|
async with tractor.open_nursery() as n:
|
||
|
portal = await n.start_actor(
|
||
|
'too_much_starteds',
|
||
|
enable_modules=[__name__],
|
||
|
)
|
||
|
|
||
|
async with portal.open_context(really_started) as (ctx, sent):
|
||
|
await trio.sleep(1)
|
||
|
# pass
|
||
|
|
||
|
with pytest.raises(tractor.RemoteActorError):
|
||
|
trio.run(main)
|
||
|
|
||
|
|
||
|
@tractor.context
|
||
|
async def never_open_stream(
|
||
|
|
||
|
ctx: tractor.Context,
|
||
|
|
||
|
) -> None:
|
||
|
'''
|
||
|
Context which never opens a stream and blocks.
|
||
|
|
||
|
'''
|
||
|
await ctx.started()
|
||
|
await trio.sleep_forever()
|
||
|
|
||
|
|
||
|
@tractor.context
|
||
|
async def keep_sending_from_callee(
|
||
|
|
||
|
ctx: tractor.Context,
|
||
|
msg_buffer_size: Optional[int] = None,
|
||
|
|
||
|
) -> None:
|
||
|
'''
|
||
|
Send endlessly on the calleee stream.
|
||
|
|
||
|
'''
|
||
|
await ctx.started()
|
||
|
async with ctx.open_stream(
|
||
|
msg_buffer_size=msg_buffer_size,
|
||
|
) as stream:
|
||
|
for msg in count():
|
||
|
print(f'callee sending {msg}')
|
||
|
await stream.send(msg)
|
||
|
await trio.sleep(0.01)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize(
|
||
|
'overrun_by',
|
||
|
[
|
||
|
('caller', 1, never_open_stream),
|
||
|
('cancel_caller_during_overrun', 1, never_open_stream),
|
||
|
('callee', 0, keep_sending_from_callee),
|
||
|
],
|
||
|
ids='overrun_condition={}'.format,
|
||
|
)
|
||
|
def test_one_end_stream_not_opened(overrun_by):
|
||
|
'''
|
||
|
This should exemplify the bug from:
|
||
|
https://github.com/goodboy/tractor/issues/265
|
||
|
|
||
|
'''
|
||
|
overrunner, buf_size_increase, entrypoint = overrun_by
|
||
|
from tractor._actor import Actor
|
||
|
buf_size = buf_size_increase + Actor.msg_buffer_size
|
||
|
|
||
|
async def main():
|
||
|
async with tractor.open_nursery() as n:
|
||
|
portal = await n.start_actor(
|
||
|
entrypoint.__name__,
|
||
|
enable_modules=[__name__],
|
||
|
)
|
||
|
|
||
|
async with portal.open_context(
|
||
|
entrypoint,
|
||
|
) as (ctx, sent):
|
||
|
assert sent is None
|
||
|
|
||
|
if 'caller' in overrunner:
|
||
|
|
||
|
async with ctx.open_stream() as stream:
|
||
|
for i in range(buf_size):
|
||
|
print(f'sending {i}')
|
||
|
await stream.send(i)
|
||
|
|
||
|
if 'cancel' in overrunner:
|
||
|
# without this we block waiting on the child side
|
||
|
await ctx.cancel()
|
||
|
|
||
|
else:
|
||
|
# expect overrun error to be relayed back
|
||
|
# and this sleep interrupted
|
||
|
await trio.sleep_forever()
|
||
|
|
||
|
else:
|
||
|
# callee overruns caller case so we do nothing here
|
||
|
await trio.sleep_forever()
|
||
|
|
||
|
await portal.cancel_actor()
|
||
|
|
||
|
# 2 overrun cases and the no overrun case (which pushes right up to
|
||
|
# the msg limit)
|
||
|
if overrunner == 'caller':
|
||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||
|
trio.run(main)
|
||
|
|
||
|
assert excinfo.value.type == StreamOverrun
|
||
|
|
||
|
elif 'cancel' in overrunner:
|
||
|
with pytest.raises(trio.MultiError) as excinfo:
|
||
|
trio.run(main)
|
||
|
|
||
|
multierr = excinfo.value
|
||
|
|
||
|
for exc in multierr.exceptions:
|
||
|
etype = type(exc)
|
||
|
if etype == tractor.RemoteActorError:
|
||
|
assert exc.type == StreamOverrun
|
||
|
else:
|
||
|
assert etype == tractor.ContextCancelled
|
||
|
|
||
|
elif overrunner == 'callee':
|
||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||
|
trio.run(main)
|
||
|
|
||
|
# TODO: embedded remote errors so that we can verify the source
|
||
|
# error?
|
||
|
# the callee delivers an error which is an overrun wrapped
|
||
|
# in a remote actor error.
|
||
|
assert excinfo.value.type == tractor.RemoteActorError
|
||
|
|
||
|
else:
|
||
|
trio.run(main)
|
||
|
|
||
|
@tractor.context
|
||
|
async def echo_back_sequence(
|
||
|
|
||
|
ctx: tractor.Context,
|
||
|
seq: list[int],
|
||
|
msg_buffer_size: Optional[int] = None,
|
||
|
|
||
|
) -> None:
|
||
|
'''
|
||
|
Send endlessly on the calleee stream.
|
||
|
|
||
|
'''
|
||
|
await ctx.started()
|
||
|
async with ctx.open_stream(
|
||
|
msg_buffer_size=msg_buffer_size,
|
||
|
) as stream:
|
||
|
|
||
|
count = 0
|
||
|
while count < 3:
|
||
|
batch = []
|
||
|
async for msg in stream:
|
||
|
batch.append(msg)
|
||
|
if batch == seq:
|
||
|
break
|
||
|
|
||
|
for msg in batch:
|
||
|
print(f'callee sending {msg}')
|
||
|
await stream.send(msg)
|
||
|
|
||
|
count += 1
|
||
|
|
||
|
return 'yo'
|
||
|
|
||
|
|
||
|
def test_stream_backpressure():
|
||
|
'''
|
||
|
Demonstrate small overruns of each task back and forth
|
||
|
on a stream not raising any errors by default.
|
||
|
|
||
|
'''
|
||
|
async def main():
|
||
|
async with tractor.open_nursery() as n:
|
||
|
portal = await n.start_actor(
|
||
|
'callee_sends_forever',
|
||
|
enable_modules=[__name__],
|
||
|
)
|
||
|
seq = list(range(3))
|
||
|
async with portal.open_context(
|
||
|
echo_back_sequence,
|
||
|
seq=seq,
|
||
|
msg_buffer_size=1,
|
||
|
) as (ctx, sent):
|
||
|
assert sent is None
|
||
|
|
||
|
async with ctx.open_stream(msg_buffer_size=1) as stream:
|
||
|
count = 0
|
||
|
while count < 3:
|
||
|
for msg in seq:
|
||
|
print(f'caller sending {msg}')
|
||
|
await stream.send(msg)
|
||
|
await trio.sleep(0.1)
|
||
|
|
||
|
batch = []
|
||
|
async for msg in stream:
|
||
|
batch.append(msg)
|
||
|
if batch == seq:
|
||
|
break
|
||
|
|
||
|
count += 1
|
||
|
|
||
|
# here the context should return
|
||
|
assert await ctx.result() == 'yo'
|
||
|
|
||
|
# cancel the daemon
|
||
|
await portal.cancel_actor()
|
||
|
|
||
|
trio.run(main)
|