forked from goodboy/tractor
1
0
Fork 0
tractor/tests/test_2way.py

213 lines
4.7 KiB
Python
Raw Normal View History

2021-05-02 18:13:44 +00:00
"""
Bidirectional streaming and context API.
2021-05-07 15:41:18 +00:00
"""
import pytest
2021-05-02 18:13:44 +00:00
import trio
import tractor
# from conftest import tractor_test
# TODO: test endofchannel semantics / cancellation / error cases:
# 3 possible outcomes:
# - normal termination: far end relays a stop message with
# final value as in async gen from ``return <val>``.
# possible outcomes:
# - normal termination: far end returns
# - premature close: far end relays a stop message to tear down stream
# - cancel: far end raises `ContextCancelled`
# future possible outcomes
# - restart request: far end raises `ContextRestart`
_state: bool = False
@tractor.context
async def simple_setup_teardown(
ctx: tractor.Context,
data: int,
) -> None:
# startup phase
global _state
_state = True
# signal to parent that we're up
await ctx.started(data + 1)
try:
# block until cancelled
await trio.sleep_forever()
finally:
_state = False
async def assert_state(value: bool):
global _state
assert _state == value
2021-06-10 17:57:16 +00:00
@pytest.mark.parametrize(
'error_parent',
[False, True],
)
def test_simple_context(error_parent):
2021-05-02 18:13:44 +00:00
async def main():
2021-06-10 17:57:16 +00:00
2021-05-02 18:13:44 +00:00
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'simple_context',
enable_modules=[__name__],
)
async with portal.open_context(
simple_setup_teardown,
data=10,
) as (ctx, sent):
assert sent == 11
await portal.run(assert_state, value=True)
# after cancellation
await portal.run(assert_state, value=False)
2021-06-10 17:57:16 +00:00
if error_parent:
raise ValueError
2021-05-02 18:13:44 +00:00
# shut down daemon
await portal.cancel_actor()
2021-06-10 17:57:16 +00:00
if error_parent:
try:
trio.run(main)
except ValueError:
pass
else:
trio.run(main)
2021-05-02 18:13:44 +00:00
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
2021-05-07 15:41:18 +00:00
"""Test a small ping-pong server.
2021-05-02 18:13:44 +00:00
2021-05-07 15:41:18 +00:00
"""
2021-05-02 18:13:44 +00:00
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
while True:
try:
await stream.receive() == 'ping'
except trio.EndOfChannel:
assert count == 10
break
else:
print('pong')
await stream.send('pong')
count += 1
2021-05-07 15:41:18 +00:00
@tractor.context
async def simple_rpc_with_forloop(
ctx: tractor.Context,
data: int,
) -> None:
"""Same as previous test but using ``async for`` syntax/api.
"""
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
print('pong')
await stream.send('pong')
count += 1
else:
assert count == 10
@pytest.mark.parametrize(
'use_async_for',
[True, False],
)
@pytest.mark.parametrize(
'server_func',
[simple_rpc, simple_rpc_with_forloop],
)
def test_simple_rpc(server_func, use_async_for):
2021-05-02 18:13:44 +00:00
"""The simplest request response pattern.
"""
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
async with portal.open_context(
2021-05-07 15:41:18 +00:00
server_func, # taken from pytest parameterization
2021-05-02 18:13:44 +00:00
data=10,
) as (ctx, sent):
assert sent == 11
async with ctx.open_stream() as stream:
2021-05-07 15:41:18 +00:00
if use_async_for:
2021-05-02 18:13:44 +00:00
2021-05-07 15:41:18 +00:00
count = 0
# receive msgs using async for style
2021-05-02 18:13:44 +00:00
print('ping')
await stream.send('ping')
2021-05-07 15:41:18 +00:00
async for msg in stream:
assert msg == 'pong'
print('ping')
await stream.send('ping')
count += 1
if count >= 9:
break
else:
# classic send/receive style
for _ in range(10):
print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
2021-05-02 18:13:44 +00:00
# stream should terminate here
await portal.cancel_actor()
trio.run(main)