diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py new file mode 100644 index 0000000..983bb42 --- /dev/null +++ b/tests/test_child_manages_service_nursery.py @@ -0,0 +1,172 @@ +''' +Test a service style daemon that maintains a nursery for spawning +"remote async tasks" including both spawning other long living +sub-sub-actor daemons. + +''' +from typing import Optional +import asyncio +from contextlib import asynccontextmanager as acm + +import pytest +import trio +import tractor +from tractor import RemoteActorError +from async_generator import aclosing + + +async def aio_streamer( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> trio.abc.ReceiveChannel: + + # required first msg to sync caller + to_trio.send_nowait(None) + + from itertools import cycle + for i in cycle(range(10)): + to_trio.send_nowait(i) + await asyncio.sleep(0.01) + + +async def trio_streamer(): + from itertools import cycle + for i in cycle(range(10)): + yield i + await trio.sleep(0.01) + + +async def trio_sleep_and_err(delay: float = 0.5): + await trio.sleep(delay) + # name error + doggy() # noqa + + +_cached_stream: Optional[ + trio.abc.ReceiveChannel +] = None + + +@acm +async def wrapper_mngr( +): + from tractor.trionics import broadcast_receiver + global _cached_stream + in_aio = tractor.current_actor().is_infected_aio() + + if in_aio: + if _cached_stream: + + from_aio = _cached_stream + + # if we already have a cached feed deliver a rx side clone + # to consumer + async with broadcast_receiver(from_aio, 6) as from_aio: + yield from_aio + return + else: + async with tractor.to_asyncio.open_channel_from( + aio_streamer, + ) as (first, from_aio): + assert not first + + # cache it so next task uses broadcast receiver + _cached_stream = from_aio + + yield from_aio + else: + async with aclosing(trio_streamer()) as stream: + # cache it so next task uses broadcast receiver + _cached_stream = stream + yield stream + + +_nursery: trio.Nursery = None + + +@tractor.context +async def trio_main( + ctx: tractor.Context, +): + # sync + await ctx.started() + + # stash a "service nursery" as "actor local" (aka a Python global) + global _nursery + n = _nursery + assert n + + async def consume_stream(): + async with wrapper_mngr() as stream: + async for msg in stream: + print(msg) + + # run 2 tasks to ensure broadcaster chan use + n.start_soon(consume_stream) + n.start_soon(consume_stream) + + n.start_soon(trio_sleep_and_err) + + await trio.sleep_forever() + + +@tractor.context +async def open_actor_local_nursery( + ctx: tractor.Context, +): + global _nursery + async with trio.open_nursery() as n: + _nursery = n + await ctx.started() + await trio.sleep(10) + # await trio.sleep(1) + + # XXX: this causes the hang since + # the caller does not unblock from its own + # ``trio.sleep_forever()``. + + # TODO: we need to test a simple ctx task starting remote tasks + # that error and then blocking on a ``Nursery.start()`` which + # never yields back.. aka a scenario where the + # ``tractor.context`` task IS NOT in the service n's cancel + # scope. + n.cancel_scope.cancel() + + +@pytest.mark.parametrize( + 'asyncio_mode', + [True, False], + ids='asyncio_mode={}'.format, +) +def test_actor_managed_trio_nursery_task_error_cancels_aio( + asyncio_mode: bool, + arb_addr +): + ''' + Verify that a ``trio`` nursery created managed in a child actor + correctly relays errors to the parent actor when one of its spawned + tasks errors even when running in infected asyncio mode and using + broadcast receivers for multi-task-per-actor subscription. + + ''' + async def main(): + + # cancel the nursery shortly after boot + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'nursery_mngr', + infect_asyncio=asyncio_mode, + enable_modules=[__name__], + ) + async with ( + p.open_context(open_actor_local_nursery) as (ctx, first), + p.open_context(trio_main) as (ctx, first), + ): + await trio.sleep_forever() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # verify boxed error + err = excinfo.value + assert isinstance(err.type(), NameError) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 9881be9..c0c33db 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -14,8 +14,8 @@ from tractor import to_asyncio from tractor import RemoteActorError -async def sleep_and_err(): - await asyncio.sleep(0.1) +async def sleep_and_err(sleep_for: float = 0.1): + await asyncio.sleep(sleep_for) assert 0