tractor/tests/test_2way.py

142 lines
3.3 KiB
Python
Raw Normal View History

'''
Audit the simplest inter-actor bidirectional (streaming)
msg patterns.
'''
from __future__ import annotations
from typing import (
Callable,
TYPE_CHECKING,
)
2021-05-07 15:41:18 +00:00
import pytest
2021-05-02 18:13:44 +00:00
import trio
import tractor
if TYPE_CHECKING:
from tractor import (
Portal,
)
2021-05-02 18:13:44 +00:00
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
'''
Test a small ping-pong server.
2021-05-02 18:13:44 +00:00
'''
2021-05-02 18:13:44 +00:00
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
while True:
try:
await stream.receive() == 'ping'
except trio.EndOfChannel:
assert count == 10
break
else:
print('pong')
await stream.send('pong')
count += 1
2021-05-07 15:41:18 +00:00
@tractor.context
async def simple_rpc_with_forloop(
ctx: tractor.Context,
data: int,
) -> None:
'''
Same as previous test but using `async for` syntax/api.
2021-05-07 15:41:18 +00:00
'''
2021-05-07 15:41:18 +00:00
# signal to parent that we're up
await ctx.started(data + 1)
print('opening stream in callee')
async with ctx.open_stream() as stream:
count = 0
async for msg in stream:
assert msg == 'ping'
print('pong')
await stream.send('pong')
count += 1
else:
assert count == 10
@pytest.mark.parametrize(
'use_async_for',
[True, False],
)
@pytest.mark.parametrize(
'server_func',
[simple_rpc, simple_rpc_with_forloop],
)
def test_simple_rpc(
server_func: Callabe,
use_async_for: bool,
):
'''
The simplest request response pattern.
2021-05-02 18:13:44 +00:00
'''
2021-05-02 18:13:44 +00:00
async def main():
with trio.fail_after(6):
async with tractor.open_nursery() as an:
portal: Portal = await an.start_actor(
'rpc_server',
enable_modules=[__name__],
)
2021-05-02 18:13:44 +00:00
async with portal.open_context(
server_func, # taken from pytest parameterization
data=10,
) as (ctx, sent):
2021-05-02 18:13:44 +00:00
assert sent == 11
2021-05-02 18:13:44 +00:00
async with ctx.open_stream() as stream:
2021-05-02 18:13:44 +00:00
if use_async_for:
2021-05-07 15:41:18 +00:00
count = 0
# receive msgs using async for style
2021-05-07 15:41:18 +00:00
print('ping')
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
print('ping')
await stream.send('ping')
count += 1
2021-05-07 15:41:18 +00:00
if count >= 9:
break
2021-05-07 15:41:18 +00:00
else:
# classic send/receive style
for _ in range(10):
print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
2021-05-02 18:13:44 +00:00
# stream should terminate here
2021-05-02 18:13:44 +00:00
# final context result(s) should be consumed here in __aexit__()
await portal.cancel_actor()
2021-05-02 18:13:44 +00:00
trio.run(main)