forked from goodboy/tractor
Add context test that opens an inter-task-channel that errors
parent
f4973e90e9
commit
98de2fab31
|
@ -13,6 +13,7 @@ import tractor
|
|||
async def aio_echo_server(
|
||||
to_trio: trio.MemorySendChannel,
|
||||
from_trio: asyncio.Queue,
|
||||
|
||||
) -> None:
|
||||
|
||||
# a first message must be sent **from** this ``asyncio``
|
||||
|
|
|
@ -11,12 +11,26 @@ import importlib
|
|||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
from tractor import RemoteActorError
|
||||
from tractor import (
|
||||
to_asyncio,
|
||||
RemoteActorError,
|
||||
ContextCancelled
|
||||
)
|
||||
from tractor.trionics import BroadcastReceiver
|
||||
|
||||
|
||||
async def sleep_and_err(sleep_for: float = 0.1):
|
||||
async def sleep_and_err(
|
||||
sleep_for: float = 0.1,
|
||||
|
||||
# just signature placeholders for compat with
|
||||
# ``to_asyncio.open_channel_from()``
|
||||
to_trio: Optional[trio.MemorySendChannel] = None,
|
||||
from_trio: Optional[asyncio.Queue] = None,
|
||||
|
||||
):
|
||||
if to_trio:
|
||||
to_trio.send_nowait('start')
|
||||
|
||||
await asyncio.sleep(sleep_for)
|
||||
assert 0
|
||||
|
||||
|
@ -146,6 +160,81 @@ def test_trio_cancels_aio(arb_addr):
|
|||
trio.run(main)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trio_ctx(
|
||||
ctx: tractor.Context,
|
||||
):
|
||||
|
||||
await ctx.started('start')
|
||||
|
||||
# this will block until the ``asyncio`` task sends a "first"
|
||||
# message.
|
||||
with trio.fail_after(0.5):
|
||||
async with (
|
||||
tractor.to_asyncio.open_channel_from(
|
||||
sleep_and_err,
|
||||
) as (first, chan),
|
||||
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
# spawn another asyncio task for the cuck of it.
|
||||
n.start_soon(
|
||||
tractor.to_asyncio.run_task,
|
||||
sleep_forever,
|
||||
)
|
||||
# await trio.sleep_forever()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'parent_cancels', [False, True],
|
||||
ids='parent_actor_cancels_child={}'.format
|
||||
)
|
||||
def test_context_spawns_aio_task_that_errors(
|
||||
arb_addr,
|
||||
parent_cancels: bool,
|
||||
):
|
||||
'''
|
||||
Verify that spawning a task via an intertask channel ctx mngr that
|
||||
errors correctly propagates the error back from the `asyncio`-side
|
||||
taksk.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
p = await n.start_actor(
|
||||
'aio_daemon',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
# debug_mode=True,
|
||||
loglevel='cancel',
|
||||
)
|
||||
async with p.open_context(
|
||||
trio_ctx,
|
||||
) as (ctx, first):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
if parent_cancels:
|
||||
await p.cancel_actor()
|
||||
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
err = excinfo.value
|
||||
assert isinstance(err, RemoteActorError)
|
||||
if parent_cancels:
|
||||
assert err.type == trio.Cancelled
|
||||
else:
|
||||
assert err.type == AssertionError
|
||||
|
||||
|
||||
async def aio_cancel():
|
||||
''''
|
||||
Cancel urself boi.
|
||||
|
@ -385,6 +474,8 @@ async def trio_to_aio_echo_server(
|
|||
print('breaking aio echo loop')
|
||||
break
|
||||
|
||||
print('exiting asyncio task')
|
||||
|
||||
async with to_asyncio.open_channel_from(
|
||||
aio_echo_server,
|
||||
) as (first, chan):
|
||||
|
|
Loading…
Reference in New Issue