forked from goodboy/tractor
Add context stream overrun tests
parent
185dbc7e3f
commit
2b05ffcc23
|
@ -5,9 +5,10 @@ Verify the we raise errors when streams are opened prior to sync-opening
|
||||||
a ``tractor.Context`` beforehand.
|
a ``tractor.Context`` beforehand.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from itertools import count
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
from trio.lowlevel import current_task
|
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,7 +19,7 @@ async def really_started(
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
try:
|
try:
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
except RuntimeError as err:
|
except RuntimeError:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,9 +33,10 @@ def test_started_called_more_then_once():
|
||||||
)
|
)
|
||||||
|
|
||||||
async with portal.open_context(really_started) as (ctx, sent):
|
async with portal.open_context(really_started) as (ctx, sent):
|
||||||
pass
|
await trio.sleep(1)
|
||||||
|
# pass
|
||||||
|
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
with pytest.raises(tractor.RemoteActorError):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,20 +46,50 @@ async def never_open_stream(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''Bidir streaming endpoint which will stream
|
'''
|
||||||
back any sequence it is sent item-wise.
|
Context which never opens a stream and blocks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
def test_no_far_end_stream_opened():
|
@tractor.context
|
||||||
|
async def keep_sending_from_callee(
|
||||||
|
|
||||||
|
ctx: tractor.Context,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Send endlessly on the calleee stream.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await ctx.started()
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
for msg in count():
|
||||||
|
await stream.send(msg)
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'overrun_by',
|
||||||
|
[
|
||||||
|
(None, 0, never_open_stream), # use default settings
|
||||||
|
('caller', 1, never_open_stream),
|
||||||
|
('callee', 0, keep_sending_from_callee),
|
||||||
|
],
|
||||||
|
ids='overrun_condition_by={}'.format,
|
||||||
|
)
|
||||||
|
def test_one_end_stream_not_opened(overrun_by):
|
||||||
'''
|
'''
|
||||||
This should exemplify the bug from:
|
This should exemplify the bug from:
|
||||||
https://github.com/goodboy/tractor/issues/265
|
https://github.com/goodboy/tractor/issues/265
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
overrunner, buf_size_increase, entrypoint = overrun_by
|
||||||
|
from tractor._actor import Actor
|
||||||
|
buf_size = buf_size_increase + Actor.msg_buffer_size
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
|
@ -65,23 +97,37 @@ def test_no_far_end_stream_opened():
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
async with (
|
async with portal.open_context(entrypoint) as (ctx, sent):
|
||||||
portal.open_context(
|
|
||||||
never_open_stream,) as (ctx, sent),
|
|
||||||
ctx.open_stream() as stream,
|
|
||||||
):
|
|
||||||
assert sent is None
|
assert sent is None
|
||||||
|
|
||||||
# XXX: so the question is whether
|
if overrunner in (None, 'caller'):
|
||||||
# this should error if the far end
|
|
||||||
# has not yet called `ctx.open_stream()`?
|
async with ctx.open_stream() as stream:
|
||||||
# If we decide to do that we need a synchronization
|
for i in range(buf_size - 1):
|
||||||
# message which is sent from that call?
|
await stream.send('yo')
|
||||||
await stream.send('yo')
|
|
||||||
|
else:
|
||||||
|
# callee overruns caller case so we do nothing here
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
# without this we block waiting on the child side
|
# without this we block waiting on the child side
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
trio.run(main)
|
# 2 overrun cases and the no overrun case (which pushes right up to
|
||||||
|
# the msg limit)
|
||||||
|
if overrunner == 'caller':
|
||||||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert excinfo.value.type == tractor._exceptions.StreamOverrun
|
||||||
|
|
||||||
|
elif overrunner == 'callee':
|
||||||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert excinfo.value.type == tractor.RemoteActorError
|
||||||
|
|
||||||
|
else:
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue