2021-05-02 18:13:44 +00:00
|
|
|
"""
|
|
|
|
Bidirectional streaming and context API.
|
|
|
|
|
2021-05-07 15:41:18 +00:00
|
|
|
"""
|
|
|
|
import pytest
|
2021-05-02 18:13:44 +00:00
|
|
|
import trio
|
|
|
|
import tractor
|
|
|
|
|
|
|
|
# from conftest import tractor_test
|
|
|
|
|
|
|
|
# TODO: test endofchannel semantics / cancellation / error cases:
|
|
|
|
# 3 possible outcomes:
|
|
|
|
# - normal termination: far end relays a stop message with
|
|
|
|
# final value as in async gen from ``return <val>``.
|
|
|
|
|
|
|
|
# possible outcomes:
|
|
|
|
# - normal termination: far end returns
|
|
|
|
# - premature close: far end relays a stop message to tear down stream
|
|
|
|
# - cancel: far end raises `ContextCancelled`
|
|
|
|
|
|
|
|
# future possible outcomes
|
|
|
|
# - restart request: far end raises `ContextRestart`
|
|
|
|
|
|
|
|
|
|
|
|
_state: bool = False
|
|
|
|
|
|
|
|
|
|
|
|
@tractor.context
|
|
|
|
async def simple_setup_teardown(
|
|
|
|
|
|
|
|
ctx: tractor.Context,
|
|
|
|
data: int,
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
|
|
|
|
# startup phase
|
|
|
|
global _state
|
|
|
|
_state = True
|
|
|
|
|
|
|
|
# signal to parent that we're up
|
|
|
|
await ctx.started(data + 1)
|
|
|
|
|
|
|
|
try:
|
|
|
|
# block until cancelled
|
|
|
|
await trio.sleep_forever()
|
|
|
|
finally:
|
|
|
|
_state = False
|
|
|
|
|
|
|
|
|
|
|
|
async def assert_state(value: bool):
|
|
|
|
global _state
|
|
|
|
assert _state == value
|
|
|
|
|
|
|
|
|
2021-06-10 17:57:16 +00:00
|
|
|
@pytest.mark.parametrize(
|
|
|
|
'error_parent',
|
|
|
|
[False, True],
|
|
|
|
)
|
|
|
|
def test_simple_context(error_parent):
|
2021-05-02 18:13:44 +00:00
|
|
|
|
|
|
|
async def main():
|
2021-06-10 17:57:16 +00:00
|
|
|
|
2021-05-02 18:13:44 +00:00
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
|
|
|
|
portal = await n.start_actor(
|
|
|
|
'simple_context',
|
|
|
|
enable_modules=[__name__],
|
|
|
|
)
|
|
|
|
|
|
|
|
async with portal.open_context(
|
|
|
|
simple_setup_teardown,
|
|
|
|
data=10,
|
|
|
|
) as (ctx, sent):
|
|
|
|
|
|
|
|
assert sent == 11
|
|
|
|
|
|
|
|
await portal.run(assert_state, value=True)
|
|
|
|
|
|
|
|
# after cancellation
|
|
|
|
await portal.run(assert_state, value=False)
|
|
|
|
|
2021-06-10 17:57:16 +00:00
|
|
|
if error_parent:
|
|
|
|
raise ValueError
|
|
|
|
|
2021-05-02 18:13:44 +00:00
|
|
|
# shut down daemon
|
|
|
|
await portal.cancel_actor()
|
|
|
|
|
2021-06-10 17:57:16 +00:00
|
|
|
if error_parent:
|
|
|
|
try:
|
|
|
|
trio.run(main)
|
|
|
|
except ValueError:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
trio.run(main)
|
2021-05-02 18:13:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
@tractor.context
|
|
|
|
async def simple_rpc(
|
|
|
|
|
|
|
|
ctx: tractor.Context,
|
|
|
|
data: int,
|
|
|
|
|
|
|
|
) -> None:
|
2021-05-07 15:41:18 +00:00
|
|
|
"""Test a small ping-pong server.
|
2021-05-02 18:13:44 +00:00
|
|
|
|
2021-05-07 15:41:18 +00:00
|
|
|
"""
|
2021-05-02 18:13:44 +00:00
|
|
|
# signal to parent that we're up
|
|
|
|
await ctx.started(data + 1)
|
|
|
|
|
|
|
|
print('opening stream in callee')
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
|
|
|
|
count = 0
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
await stream.receive() == 'ping'
|
|
|
|
except trio.EndOfChannel:
|
|
|
|
assert count == 10
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
print('pong')
|
|
|
|
await stream.send('pong')
|
|
|
|
count += 1
|
|
|
|
|
|
|
|
|
2021-05-07 15:41:18 +00:00
|
|
|
@tractor.context
|
|
|
|
async def simple_rpc_with_forloop(
|
|
|
|
|
|
|
|
ctx: tractor.Context,
|
|
|
|
data: int,
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
"""Same as previous test but using ``async for`` syntax/api.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# signal to parent that we're up
|
|
|
|
await ctx.started(data + 1)
|
|
|
|
|
|
|
|
print('opening stream in callee')
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
|
|
|
|
count = 0
|
|
|
|
async for msg in stream:
|
|
|
|
|
|
|
|
assert msg == 'ping'
|
|
|
|
print('pong')
|
|
|
|
await stream.send('pong')
|
|
|
|
count += 1
|
|
|
|
|
|
|
|
else:
|
|
|
|
assert count == 10
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
'use_async_for',
|
|
|
|
[True, False],
|
|
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
'server_func',
|
|
|
|
[simple_rpc, simple_rpc_with_forloop],
|
|
|
|
)
|
|
|
|
def test_simple_rpc(server_func, use_async_for):
|
2021-05-02 18:13:44 +00:00
|
|
|
"""The simplest request response pattern.
|
|
|
|
|
|
|
|
"""
|
|
|
|
async def main():
|
|
|
|
async with tractor.open_nursery() as n:
|
|
|
|
|
|
|
|
portal = await n.start_actor(
|
|
|
|
'rpc_server',
|
|
|
|
enable_modules=[__name__],
|
|
|
|
)
|
|
|
|
|
|
|
|
async with portal.open_context(
|
2021-05-07 15:41:18 +00:00
|
|
|
server_func, # taken from pytest parameterization
|
2021-05-02 18:13:44 +00:00
|
|
|
data=10,
|
|
|
|
) as (ctx, sent):
|
|
|
|
|
|
|
|
assert sent == 11
|
|
|
|
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
|
2021-05-07 15:41:18 +00:00
|
|
|
if use_async_for:
|
2021-05-02 18:13:44 +00:00
|
|
|
|
2021-05-07 15:41:18 +00:00
|
|
|
count = 0
|
|
|
|
# receive msgs using async for style
|
2021-05-02 18:13:44 +00:00
|
|
|
print('ping')
|
|
|
|
await stream.send('ping')
|
2021-05-07 15:41:18 +00:00
|
|
|
|
|
|
|
async for msg in stream:
|
|
|
|
assert msg == 'pong'
|
|
|
|
print('ping')
|
|
|
|
await stream.send('ping')
|
|
|
|
count += 1
|
|
|
|
|
|
|
|
if count >= 9:
|
|
|
|
break
|
|
|
|
|
|
|
|
else:
|
|
|
|
# classic send/receive style
|
|
|
|
for _ in range(10):
|
|
|
|
|
|
|
|
print('ping')
|
|
|
|
await stream.send('ping')
|
|
|
|
assert await stream.receive() == 'pong'
|
2021-05-02 18:13:44 +00:00
|
|
|
|
|
|
|
# stream should terminate here
|
|
|
|
|
|
|
|
await portal.cancel_actor()
|
|
|
|
|
|
|
|
trio.run(main)
|