forked from goodboy/tractor
Add a basic `open_channel_from()` streaming test
parent
9bc94b5ccc
commit
d27ddb7bbb
|
@ -2,7 +2,7 @@
|
||||||
The most hipster way to force SC onto the stdlib's "async".
|
The most hipster way to force SC onto the stdlib's "async".
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from typing import Optional
|
from typing import Optional, Iterable
|
||||||
import asyncio
|
import asyncio
|
||||||
import builtins
|
import builtins
|
||||||
import importlib
|
import importlib
|
||||||
|
@ -10,6 +10,7 @@ import importlib
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor import to_asyncio
|
||||||
from tractor import RemoteActorError
|
from tractor import RemoteActorError
|
||||||
|
|
||||||
|
|
||||||
|
@ -176,17 +177,64 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_error_cancels_aio(arb_addr):
|
# TODO:
|
||||||
...
|
async def no_to_trio_in_args():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def push_from_aio_task(
|
||||||
|
|
||||||
|
sequence: Iterable,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
for i in range(100):
|
||||||
|
print(f'asyncio sending {i}')
|
||||||
|
to_trio.send_nowait(i)
|
||||||
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
|
print(f'asyncio streamer complete!')
|
||||||
|
|
||||||
|
|
||||||
|
async def stream_from_aio():
|
||||||
|
seq = range(100)
|
||||||
|
expect = list(seq)
|
||||||
|
|
||||||
|
async with to_asyncio.open_channel_from(
|
||||||
|
push_from_aio_task,
|
||||||
|
sequence=seq,
|
||||||
|
) as (first, chan):
|
||||||
|
|
||||||
|
pulled = [first]
|
||||||
|
async for value in chan:
|
||||||
|
print(f'trio received {value}')
|
||||||
|
pulled.append(value)
|
||||||
|
|
||||||
|
assert pulled == expect
|
||||||
|
|
||||||
|
print('trio guest mode task completed!')
|
||||||
|
|
||||||
|
|
||||||
def test_basic_interloop_channel_stream(arb_addr):
|
def test_basic_interloop_channel_stream(arb_addr):
|
||||||
...
|
async def main():
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
portal = await n.run_in_actor(
|
||||||
|
stream_from_aio,
|
||||||
|
infect_asyncio=True,
|
||||||
|
)
|
||||||
|
await portal.result()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_and_channel_exits(arb_addr):
|
|
||||||
...
|
# def test_trio_error_cancels_intertask_chan(arb_addr):
|
||||||
|
# ...
|
||||||
|
|
||||||
|
|
||||||
def test_aio_errors_and_channel_propagates(arb_addr):
|
# def test_trio_cancels_and_channel_exits(arb_addr):
|
||||||
...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
|
# def test_aio_errors_and_channel_propagates(arb_addr):
|
||||||
|
# ...
|
||||||
|
|
Loading…
Reference in New Issue