tractor/tests/test_context_streams.py

244 lines
6.4 KiB
Python
Raw Normal View History

'''
``async with ():`` inlined context-stream cancellation testing.
Verify the we raise errors when streams are opened prior to sync-opening
a ``tractor.Context`` beforehand.
'''
2021-12-06 00:50:39 +00:00
from itertools import count
from typing import Optional
2021-12-06 00:50:39 +00:00
import pytest
import trio
import tractor
2021-12-06 21:00:21 +00:00
from tractor._exceptions import StreamOverrun
@tractor.context
async def really_started(
ctx: tractor.Context,
) -> None:
await ctx.started()
try:
await ctx.started()
2021-12-06 00:50:39 +00:00
except RuntimeError:
raise
def test_started_called_more_then_once():
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'too_much_starteds',
enable_modules=[__name__],
)
async with portal.open_context(really_started) as (ctx, sent):
2021-12-06 00:50:39 +00:00
await trio.sleep(1)
# pass
2021-12-06 00:50:39 +00:00
with pytest.raises(tractor.RemoteActorError):
trio.run(main)
@tractor.context
async def never_open_stream(
ctx: tractor.Context,
) -> None:
2021-12-06 00:50:39 +00:00
'''
Context which never opens a stream and blocks.
'''
await ctx.started()
await trio.sleep_forever()
2021-12-06 00:50:39 +00:00
@tractor.context
async def keep_sending_from_callee(
ctx: tractor.Context,
msg_buffer_size: Optional[int] = None,
2021-12-06 00:50:39 +00:00
) -> None:
'''
Send endlessly on the calleee stream.
'''
await ctx.started()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,
) as stream:
2021-12-06 00:50:39 +00:00
for msg in count():
print(f'callee sending {msg}')
2021-12-06 00:50:39 +00:00
await stream.send(msg)
await trio.sleep(0.01)
@pytest.mark.parametrize(
'overrun_by',
[
('caller', 1, never_open_stream),
2021-12-06 21:00:21 +00:00
('cancel_caller_during_overrun', 1, never_open_stream),
2021-12-06 00:50:39 +00:00
('callee', 0, keep_sending_from_callee),
],
2021-12-06 21:00:21 +00:00
ids='overrun_condition={}'.format,
2021-12-06 00:50:39 +00:00
)
def test_one_end_stream_not_opened(overrun_by):
'''
This should exemplify the bug from:
https://github.com/goodboy/tractor/issues/265
'''
2021-12-06 00:50:39 +00:00
overrunner, buf_size_increase, entrypoint = overrun_by
from tractor._actor import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
entrypoint.__name__,
enable_modules=[__name__],
)
async with portal.open_context(
entrypoint,
) as (ctx, sent):
assert sent is None
2021-12-06 21:00:21 +00:00
if 'caller' in overrunner:
2021-12-06 00:50:39 +00:00
async with ctx.open_stream() as stream:
2021-12-06 21:00:21 +00:00
for i in range(buf_size):
print(f'sending {i}')
await stream.send(i)
2021-12-06 21:00:21 +00:00
if 'cancel' in overrunner:
# without this we block waiting on the child side
await ctx.cancel()
else:
2021-12-06 21:00:21 +00:00
# expect overrun error to be relayed back
# and this sleep interrupted
await trio.sleep_forever()
2021-12-06 00:50:39 +00:00
else:
# callee overruns caller case so we do nothing here
await trio.sleep_forever()
2021-12-03 19:49:08 +00:00
await portal.cancel_actor()
2021-12-06 00:50:39 +00:00
# 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit)
if overrunner == 'caller':
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
2021-12-06 21:00:21 +00:00
assert excinfo.value.type == StreamOverrun
elif 'cancel' in overrunner:
with pytest.raises(trio.MultiError) as excinfo:
trio.run(main)
multierr = excinfo.value
for exc in multierr.exceptions:
etype = type(exc)
if etype == tractor.RemoteActorError:
assert exc.type == StreamOverrun
else:
assert etype == tractor.ContextCancelled
2021-12-06 00:50:39 +00:00
elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
2021-12-06 21:00:21 +00:00
# TODO: embedded remote errors so that we can verify the source
# error?
# the callee delivers an error which is an overrun wrapped
# in a remote actor error.
2021-12-06 00:50:39 +00:00
assert excinfo.value.type == tractor.RemoteActorError
else:
trio.run(main)
@tractor.context
async def echo_back_sequence(
ctx: tractor.Context,
seq: list[int],
msg_buffer_size: Optional[int] = None,
) -> None:
'''
Send endlessly on the calleee stream.
'''
await ctx.started()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,
) as stream:
count = 0
while count < 3:
batch = []
async for msg in stream:
batch.append(msg)
if batch == seq:
break
for msg in batch:
print(f'callee sending {msg}')
await stream.send(msg)
count += 1
return 'yo'
def test_stream_backpressure():
'''
Demonstrate small overruns of each task back and forth
on a stream not raising any errors by default.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'callee_sends_forever',
enable_modules=[__name__],
)
seq = list(range(3))
async with portal.open_context(
echo_back_sequence,
seq=seq,
msg_buffer_size=1,
) as (ctx, sent):
assert sent is None
async with ctx.open_stream(msg_buffer_size=1) as stream:
count = 0
while count < 3:
for msg in seq:
print(f'caller sending {msg}')
await stream.send(msg)
await trio.sleep(0.1)
batch = []
async for msg in stream:
batch.append(msg)
if batch == seq:
break
count += 1
# here the context should return
assert await ctx.result() == 'yo'
# cancel the daemon
await portal.cancel_actor()
trio.run(main)