From 13c8300226e6135930dfcf2e918be1a1d51f9d76 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Feb 2022 13:27:47 -0500 Subject: [PATCH 01/15] Add a sub-actor managed service nursery test scenario --- tests/test_child_manages_service_nursery.py | 172 ++++++++++++++++++++ tests/test_infected_asyncio.py | 4 +- 2 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 tests/test_child_manages_service_nursery.py diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py new file mode 100644 index 0000000..983bb42 --- /dev/null +++ b/tests/test_child_manages_service_nursery.py @@ -0,0 +1,172 @@ +''' +Test a service style daemon that maintains a nursery for spawning +"remote async tasks" including both spawning other long living +sub-sub-actor daemons. + +''' +from typing import Optional +import asyncio +from contextlib import asynccontextmanager as acm + +import pytest +import trio +import tractor +from tractor import RemoteActorError +from async_generator import aclosing + + +async def aio_streamer( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> trio.abc.ReceiveChannel: + + # required first msg to sync caller + to_trio.send_nowait(None) + + from itertools import cycle + for i in cycle(range(10)): + to_trio.send_nowait(i) + await asyncio.sleep(0.01) + + +async def trio_streamer(): + from itertools import cycle + for i in cycle(range(10)): + yield i + await trio.sleep(0.01) + + +async def trio_sleep_and_err(delay: float = 0.5): + await trio.sleep(delay) + # name error + doggy() # noqa + + +_cached_stream: Optional[ + trio.abc.ReceiveChannel +] = None + + +@acm +async def wrapper_mngr( +): + from tractor.trionics import broadcast_receiver + global _cached_stream + in_aio = tractor.current_actor().is_infected_aio() + + if in_aio: + if _cached_stream: + + from_aio = _cached_stream + + # if we already have a cached feed deliver a rx side clone + # to consumer + async with broadcast_receiver(from_aio, 6) as from_aio: + yield from_aio + return + else: + async with tractor.to_asyncio.open_channel_from( + aio_streamer, + ) as (first, from_aio): + assert not first + + # cache it so next task uses broadcast receiver + _cached_stream = from_aio + + yield from_aio + else: + async with aclosing(trio_streamer()) as stream: + # cache it so next task uses broadcast receiver + _cached_stream = stream + yield stream + + +_nursery: trio.Nursery = None + + +@tractor.context +async def trio_main( + ctx: tractor.Context, +): + # sync + await ctx.started() + + # stash a "service nursery" as "actor local" (aka a Python global) + global _nursery + n = _nursery + assert n + + async def consume_stream(): + async with wrapper_mngr() as stream: + async for msg in stream: + print(msg) + + # run 2 tasks to ensure broadcaster chan use + n.start_soon(consume_stream) + n.start_soon(consume_stream) + + n.start_soon(trio_sleep_and_err) + + await trio.sleep_forever() + + +@tractor.context +async def open_actor_local_nursery( + ctx: tractor.Context, +): + global _nursery + async with trio.open_nursery() as n: + _nursery = n + await ctx.started() + await trio.sleep(10) + # await trio.sleep(1) + + # XXX: this causes the hang since + # the caller does not unblock from its own + # ``trio.sleep_forever()``. + + # TODO: we need to test a simple ctx task starting remote tasks + # that error and then blocking on a ``Nursery.start()`` which + # never yields back.. aka a scenario where the + # ``tractor.context`` task IS NOT in the service n's cancel + # scope. + n.cancel_scope.cancel() + + +@pytest.mark.parametrize( + 'asyncio_mode', + [True, False], + ids='asyncio_mode={}'.format, +) +def test_actor_managed_trio_nursery_task_error_cancels_aio( + asyncio_mode: bool, + arb_addr +): + ''' + Verify that a ``trio`` nursery created managed in a child actor + correctly relays errors to the parent actor when one of its spawned + tasks errors even when running in infected asyncio mode and using + broadcast receivers for multi-task-per-actor subscription. + + ''' + async def main(): + + # cancel the nursery shortly after boot + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'nursery_mngr', + infect_asyncio=asyncio_mode, + enable_modules=[__name__], + ) + async with ( + p.open_context(open_actor_local_nursery) as (ctx, first), + p.open_context(trio_main) as (ctx, first), + ): + await trio.sleep_forever() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # verify boxed error + err = excinfo.value + assert isinstance(err.type(), NameError) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 9881be9..c0c33db 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -14,8 +14,8 @@ from tractor import to_asyncio from tractor import RemoteActorError -async def sleep_and_err(): - await asyncio.sleep(0.1) +async def sleep_and_err(sleep_for: float = 0.1): + await asyncio.sleep(sleep_for) assert 0 From 9b77b8c9eea7ff4c3dfa957dd5d3884cfe87d9cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Feb 2022 13:43:11 -0500 Subject: [PATCH 02/15] Add more explicit `asyncio` task error logging When an `asyncio` side task errors or is cancelled we now explicitly report the traceback and task name if possible as well as the source reason for the error (some come from the `trio` side). Further, properly set any `trio` side exception (after unwrapping it from the `outcome.Error`) on the future that runs the `trio` guest run. --- tractor/to_asyncio.py | 75 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 10 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 9b18a87..371a936 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -23,6 +23,7 @@ from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect +import traceback from typing import ( Any, Callable, @@ -32,6 +33,7 @@ from typing import ( ) import trio +from outcome import Error from .log import get_logger from ._state import current_actor @@ -67,6 +69,14 @@ class LinkedTaskChannel(trio.abc.Channel): async def receive(self) -> Any: async with translate_aio_errors(self): + + # TODO: do we need this to guarantee asyncio code get's + # cancelled in the case where the trio side somehow creates + # a state where the asyncio cycle-task isn't getting the + # cancel request sent by (in theory) the last checkpoint + # cycle on the trio side? + # await trio.lowlevel.checkpoint() + return await self._from_aio.receive() async def wait_ayncio_complete(self) -> None: @@ -202,20 +212,20 @@ def _run_asyncio_task( ''' nonlocal chan aio_err = chan._aio_err + task_err: Optional[BaseException] = None # only to avoid ``asyncio`` complaining about uncaptured # task exceptions try: task.exception() except BaseException as terr: + task_err = terr + log.exception(f'`asyncio` task: {task.get_name()} errored') assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' if aio_err is not None: - if type(aio_err) is CancelledError: - log.cancel("infected task was cancelled") - else: - aio_err.with_traceback(aio_err.__traceback__) - log.exception("infected task errorred:") + # XXX: uhh is this true? + # assert task_err, f'Asyncio task {task.get_name()} discrepancy!?' # NOTE: currently mem chan closure may act as a form # of error relay (at least in the ``asyncio.CancelledError`` @@ -224,8 +234,25 @@ def _run_asyncio_task( # We might want to change this in the future though. from_aio.close() - task.add_done_callback(cancel_trio) + if type(aio_err) is CancelledError: + log.cancel("infected task was cancelled") + # TODO: show that the cancellation originated + # from the ``trio`` side? right? + # if cancel_scope.cancelled: + # raise aio_err from err + + elif task_err is None: + aio_err.with_traceback(aio_err.__traceback__) + msg = ''.join(traceback.format_exception(aio_err)) + log.error( + f'infected task errorred:\n{msg}' + ) + + # raise any ``asyncio`` side error. + raise aio_err + + task.add_done_callback(cancel_trio) return chan @@ -240,6 +267,8 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' + trio_task = trio.lowlevel.current_task() + aio_err: Optional[BaseException] = None def maybe_raise_aio_err( @@ -260,10 +289,21 @@ async def translate_aio_errors( assert task try: yield + + except ( + trio.Cancelled, + ): + # relay cancel through to called ``asyncio`` task + chan._aio_task.cancel( + msg=f'the `trio` caller task was cancelled:\n{trio_task.name}' + ) + raise + except ( # NOTE: see the note in the ``cancel_trio()`` asyncio task # termination callback trio.ClosedResourceError, + # trio.BrokenResourceError, ): aio_err = chan._aio_err if ( @@ -277,6 +317,7 @@ async def translate_aio_errors( else: raise + finally: # always cancel the ``asyncio`` task if we've made it this far # and it's not done. @@ -289,6 +330,7 @@ async def translate_aio_errors( maybe_raise_aio_err() + async def run_task( func: Callable, *, @@ -309,7 +351,6 @@ async def run_task( **kwargs, ) with chan._from_aio: - # try: async with translate_aio_errors(chan): # return single value that is the output from the # ``asyncio`` function-as-task. Expect the mem chan api to @@ -343,7 +384,7 @@ async def open_channel_from( # ``asyncio`` task. first = await chan.receive() - # stream values upward + # deliver stream handle upward yield first, chan @@ -380,9 +421,22 @@ def run_as_asyncio_guest( trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): + actor = current_actor() - print(f"trio_main finished: {main_outcome!r}") - trio_done_fut.set_result(main_outcome) + if isinstance(main_outcome, Error): + error = main_outcome.error + trio_done_fut.set_exception(error) + + # TODO: explicit asyncio tb? + # traceback.print_exception(error) + + # XXX: do we need this? + # actor.cancel_soon() + + main_outcome.unwrap() + else: + trio_done_fut.set_result(main_outcome) + print(f"trio_main finished: {main_outcome!r}") # start the infection: run trio on the asyncio loop in "guest mode" log.info(f"Infecting asyncio process with {trio_main}") @@ -392,6 +446,7 @@ def run_as_asyncio_guest( run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) + # ``.unwrap()`` will raise here on error return (await trio_done_fut).unwrap() # might as well if it's installed. From 46963c2e6351ad9ccef77186df4abed676d3ab94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 16:25:48 -0500 Subject: [PATCH 03/15] Don't handle `GeneratorExit` on `asyncio` tasks --- tractor/to_asyncio.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 371a936..ca15b00 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -109,6 +109,7 @@ def _run_asyncio_task( or stream the result back to ``trio``. ''' + __tracebackhide__ = True if not current_actor().is_infected_aio(): raise RuntimeError("`infect_asyncio` mode is not enabled!?") @@ -167,6 +168,9 @@ def _run_asyncio_task( orig = result = id(coro) try: result = await coro + except GeneratorExit: + # no need to relay error + raise except BaseException as aio_err: chan._aio_err = aio_err raise @@ -295,7 +299,7 @@ async def translate_aio_errors( ): # relay cancel through to called ``asyncio`` task chan._aio_task.cancel( - msg=f'the `trio` caller task was cancelled:\n{trio_task.name}' + msg=f'the `trio` caller task was cancelled: {trio_task.name}' ) raise From c322a193f2609f75be3db3f623be8a5f099f256e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Apr 2022 17:07:06 -0400 Subject: [PATCH 04/15] Make `LinkedTaskChannel` trio-task-broadcastable with `.subscribe()` --- tractor/to_asyncio.py | 46 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index ca15b00..ab4ff34 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -38,6 +38,10 @@ from outcome import Error from .log import get_logger from ._state import current_actor from ._exceptions import AsyncioCancelled +from .trionics._broadcast import ( + broadcast_receiver, + BroadcastReceiver, +) log = get_logger(__name__) @@ -63,6 +67,7 @@ class LinkedTaskChannel(trio.abc.Channel): # set after ``asyncio.create_task()`` _aio_task: Optional[asyncio.Task] = None _aio_err: Optional[BaseException] = None + _broadcaster: Optional[BroadcastReceiver] = None async def aclose(self) -> None: await self._from_aio.aclose() @@ -79,7 +84,7 @@ class LinkedTaskChannel(trio.abc.Channel): return await self._from_aio.receive() - async def wait_ayncio_complete(self) -> None: + async def wait_asyncio_complete(self) -> None: await self._aio_task_complete.wait() # def cancel_asyncio_task(self) -> None: @@ -94,6 +99,43 @@ class LinkedTaskChannel(trio.abc.Channel): ''' self._to_aio.put_nowait(item) + def closed(self) -> bool: + return self._from_aio._closed + + # TODO: shoud we consider some kind of "decorator" system + # that checks for structural-typing compatibliity and then + # automatically adds this ctx-mngr-as-method machinery? + @acm + async def subscribe( + self, + + ) -> AsyncIterator[BroadcastReceiver]: + ''' + Allocate and return a ``BroadcastReceiver`` which delegates + to this inter-task channel. + + This allows multiple local tasks to receive each their own copy + of this message stream. + + See ``tractor._streaming.MsgStream.subscribe()`` for further + similar details. + ''' + if self._broadcaster is None: + + bcast = self._broadcaster = broadcast_receiver( + self, + # use memory channel size by default + self._from_aio._state.max_buffer_size, # type: ignore + receive_afunc=self.receive, + ) + + self.receive = bcast.receive # type: ignore + + async with self._broadcaster.subscribe() as bstream: + assert bstream.key != self._broadcaster.key + assert bstream._recv == self._broadcaster._recv + yield bstream + def _run_asyncio_task( @@ -334,7 +376,6 @@ async def translate_aio_errors( maybe_raise_aio_err() - async def run_task( func: Callable, *, @@ -425,7 +466,6 @@ def run_as_asyncio_guest( trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): - actor = current_actor() if isinstance(main_outcome, Error): error = main_outcome.error From 032e14e326784214da6ddf9b8c2612751eb7310d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 11:42:41 -0400 Subject: [PATCH 05/15] Update new license info in setup script --- setup.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/setup.py b/setup.py index dd00715..3aa1b96 100755 --- a/setup.py +++ b/setup.py @@ -1,21 +1,22 @@ #!/usr/bin/env python # -# tractor: a trionic actor model built on `multiprocessing` and `trio` +# tractor: structured concurrent "actors". # -# Copyright (C) 2018-2020 Tyler Goodlet +# Copyright 2018-eternity Tyler Goodlet. # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by +# it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . from setuptools import setup with open('docs/README.rst', encoding='utf-8') as f: @@ -27,7 +28,7 @@ setup( version='0.1.0a5.dev', # alpha zone description='structured concurrrent "actors"', long_description=readme, - license='GPLv3', + license='AGPLv3', author='Tyler Goodlet', maintainer='Tyler Goodlet', maintainer_email='jgbt@protonmail.com', @@ -80,7 +81,7 @@ setup( "Operating System :: POSIX :: Linux", "Operating System :: Microsoft :: Windows", "Framework :: Trio", - "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.8", From f3606d5bd8f61d31d60dda289bcee29b2e405565 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 11:48:32 -0400 Subject: [PATCH 06/15] Type fixes --- tractor/to_asyncio.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index ab4ff34..6ca07ca 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -100,7 +100,7 @@ class LinkedTaskChannel(trio.abc.Channel): self._to_aio.put_nowait(item) def closed(self) -> bool: - return self._from_aio._closed + return self._from_aio._closed # type: ignore # TODO: shoud we consider some kind of "decorator" system # that checks for structural-typing compatibliity and then @@ -289,8 +289,9 @@ def _run_asyncio_task( # raise aio_err from err elif task_err is None: + assert aio_err aio_err.with_traceback(aio_err.__traceback__) - msg = ''.join(traceback.format_exception(aio_err)) + msg = ''.join(traceback.format_exception(type(aio_err))) log.error( f'infected task errorred:\n{msg}' ) @@ -340,6 +341,7 @@ async def translate_aio_errors( trio.Cancelled, ): # relay cancel through to called ``asyncio`` task + assert chan._aio_task chan._aio_task.cancel( msg=f'the `trio` caller task was cancelled: {trio_task.name}' ) From 20d281f6199e342d3c257fe5b3f5457257f4cfa1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 12:53:12 -0400 Subject: [PATCH 07/15] Run `mypy` on 3.10 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d954d79..1c86597 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: - name: Setup python uses: actions/setup-python@v2 with: - python-version: '3.9' + python-version: '3.10' - name: Install dependencies run: pip install -U . --upgrade-strategy eager -r requirements-test.txt From faf751acac5a17e5783bf2fef19882993f14daa5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Feb 2022 10:59:15 -0500 Subject: [PATCH 08/15] WIP reproduce deadlock issue during error from piker --- tests/test_child_manages_service_nursery.py | 63 +++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 983bb42..7c74f00 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -170,3 +170,66 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio( # verify boxed error err = excinfo.value assert isinstance(err.type(), NameError) + + +from trio_typing import TaskStatus + + +def test_nursery(): + + _cn = None + + async def task_name_errors_and_never_startedz( + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ): + await trio.sleep(0.5) + + # pet pet + doggy() + + # we never get here due to name error + task_status.started() + + async def waits_on_signal( + ev: trio.Event(), + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, + ): + await ev.wait() + task_status.started() + + async def start_cn_tasks( + # pn: trio.Nursery, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, + ): + nonlocal _cn + assert _cn + + ev = trio.Event() + + _cn.start_soon(task_never_starteds) + await _cn.start(waits_on_signal, ev) + task_status.started() + + async def mk_cn( + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ): + nonlocal _cn + + async with trio.open_nursery() as cn: + _cn = cn + + task_status.started(cn) + await trio.sleep_forever() + + async def main(): + async with ( + trio.open_nursery() as pn, + ): + cn = await pn.start(mk_cn) + assert cn + + # start a parent level task which starts a task + pn.start_soon(start_cn_tasks) + await trio.sleep_forever() + + trio.run(main) From e45251db56114a38f7a4559c8f53d041e483024b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Feb 2022 12:41:35 -0500 Subject: [PATCH 09/15] Simplify to form submitted to njs --- tests/test_child_manages_service_nursery.py | 71 ++++++++++----------- 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 7c74f00..4f3ed3e 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -10,6 +10,7 @@ from contextlib import asynccontextmanager as acm import pytest import trio +from trio_typing import TaskStatus import tractor from tractor import RemoteActorError from async_generator import aclosing @@ -172,64 +173,58 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio( assert isinstance(err.type(), NameError) -from trio_typing import TaskStatus +def test_stashed_child_nursery(): - -def test_nursery(): - - _cn = None - - async def task_name_errors_and_never_startedz( - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, - ): - await trio.sleep(0.5) - - # pet pet - doggy() - - # we never get here due to name error - task_status.started() + _child_nursery = None async def waits_on_signal( ev: trio.Event(), task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ): + ''' + Do some stuf, then signal other tasks, then yield back to "starter". + + ''' await ev.wait() task_status.started() - async def start_cn_tasks( - # pn: trio.Nursery, - task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, - ): - nonlocal _cn - assert _cn - - ev = trio.Event() - - _cn.start_soon(task_never_starteds) - await _cn.start(waits_on_signal, ev) - task_status.started() - - async def mk_cn( + async def mk_child_nursery( task_status: TaskStatus = trio.TASK_STATUS_IGNORED, ): - nonlocal _cn + ''' + Allocate a child sub-nursery and stash it as a global. + + ''' + nonlocal _child_nursery async with trio.open_nursery() as cn: - _cn = cn - + _child_nursery = cn task_status.started(cn) + + # block until cancelled by parent. await trio.sleep_forever() + async def sleep_and_err(ev: trio.Event): + await trio.sleep(0.5) + doggy() # noqa + ev.set() + async def main(): + async with ( trio.open_nursery() as pn, ): - cn = await pn.start(mk_cn) + cn = await pn.start(mk_child_nursery) assert cn - # start a parent level task which starts a task - pn.start_soon(start_cn_tasks) - await trio.sleep_forever() + ev = trio.Event() + cn.start_soon(sleep_and_err, ev) - trio.run(main) + # this causes inf hang + await cn.start(waits_on_signal, ev) + + # this does not. + # cn.start_soon(waits_on_signal, ev) + + with pytest.raises(NameError): + trio.run(main) From 9c43bb28f1317cf74fadea730dca1b92948e9976 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Feb 2022 16:47:39 -0500 Subject: [PATCH 10/15] Add a new "trioisms" test mod for tracking `trio` wishlist behaviour --- tests/test_child_manages_service_nursery.py | 57 -------------- tests/test_trioisms.py | 82 +++++++++++++++++++++ 2 files changed, 82 insertions(+), 57 deletions(-) create mode 100644 tests/test_trioisms.py diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 4f3ed3e..806e6d7 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -171,60 +171,3 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio( # verify boxed error err = excinfo.value assert isinstance(err.type(), NameError) - - -def test_stashed_child_nursery(): - - _child_nursery = None - - async def waits_on_signal( - ev: trio.Event(), - task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, - ): - ''' - Do some stuf, then signal other tasks, then yield back to "starter". - - ''' - await ev.wait() - task_status.started() - - async def mk_child_nursery( - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, - ): - ''' - Allocate a child sub-nursery and stash it as a global. - - ''' - nonlocal _child_nursery - - async with trio.open_nursery() as cn: - _child_nursery = cn - task_status.started(cn) - - # block until cancelled by parent. - await trio.sleep_forever() - - async def sleep_and_err(ev: trio.Event): - await trio.sleep(0.5) - doggy() # noqa - ev.set() - - async def main(): - - async with ( - trio.open_nursery() as pn, - ): - cn = await pn.start(mk_child_nursery) - assert cn - - ev = trio.Event() - cn.start_soon(sleep_and_err, ev) - - # this causes inf hang - await cn.start(waits_on_signal, ev) - - # this does not. - # cn.start_soon(waits_on_signal, ev) - - with pytest.raises(NameError): - trio.run(main) diff --git a/tests/test_trioisms.py b/tests/test_trioisms.py new file mode 100644 index 0000000..5b19f50 --- /dev/null +++ b/tests/test_trioisms.py @@ -0,0 +1,82 @@ +''' +Reminders for oddities in `trio` that we need to stay aware of and/or +want to see changed. + +''' +import pytest +import trio +from trio_typing import TaskStatus + + +@pytest.mark.parametrize( + 'use_start_soon', [ + pytest.param( + True, + marks=pytest.mark.xfail(reason="see python-trio/trio#2258") + ), + False, + ] +) +def test_stashed_child_nursery(use_start_soon): + + _child_nursery = None + + async def waits_on_signal( + ev: trio.Event(), + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, + ): + ''' + Do some stuf, then signal other tasks, then yield back to "starter". + + ''' + await ev.wait() + task_status.started() + + async def mk_child_nursery( + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ): + ''' + Allocate a child sub-nursery and stash it as a global. + + ''' + nonlocal _child_nursery + + async with trio.open_nursery() as cn: + _child_nursery = cn + task_status.started(cn) + + # block until cancelled by parent. + await trio.sleep_forever() + + async def sleep_and_err( + ev: trio.Event, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ): + await trio.sleep(0.5) + doggy() # noqa + ev.set() + task_status.started() + + async def main(): + + async with ( + trio.open_nursery() as pn, + ): + cn = await pn.start(mk_child_nursery) + assert cn + + ev = trio.Event() + + if use_start_soon: + # this causes inf hang + cn.start_soon(sleep_and_err, ev) + + else: + # this does not. + await cn.start(sleep_and_err, ev) + + with trio.fail_after(1): + await cn.start(waits_on_signal, ev) + + with pytest.raises(NameError): + trio.run(main) From 90593611bbe791977f5dbb21e5e0801789f18339 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 14:24:30 -0400 Subject: [PATCH 11/15] Add test for `LinkedTaskChannel.subscribe()` fanout feature --- tests/test_infected_asyncio.py | 56 +++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index c0c33db..8fa6eb5 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -2,9 +2,10 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'. ''' -from typing import Optional, Iterable +from typing import Optional, Iterable, Union import asyncio import builtins +import itertools import importlib import pytest @@ -12,6 +13,7 @@ import trio import tractor from tractor import to_asyncio from tractor import RemoteActorError +from tractor.trionics import BroadcastReceiver async def sleep_and_err(sleep_for: float = 0.1): @@ -217,6 +219,7 @@ async def stream_from_aio( exit_early: bool = False, raise_err: bool = False, aio_raise_err: bool = False, + fan_out: bool = False, ) -> None: seq = range(100) @@ -234,15 +237,33 @@ async def stream_from_aio( assert first is True - async for value in chan: - print(f'trio received {value}') - pulled.append(value) + async def consume( + chan: Union[ + to_asyncio.LinkedTaskChannel, + BroadcastReceiver, + ], + ): + async for value in chan: + print(f'trio received {value}') + pulled.append(value) - if value == 50: - if raise_err: - raise Exception - elif exit_early: - break + if value == 50: + if raise_err: + raise Exception + elif exit_early: + break + + if fan_out: + # start second task that get's the same stream value set. + async with ( + trio.open_nursery() as n, + chan.subscribe() as br, + ): + n.start_soon(consume, br) + await consume(chan) + + else: + await consume(chan) finally: if ( @@ -250,19 +271,32 @@ async def stream_from_aio( not exit_early and not aio_raise_err ): - assert pulled == expect + if fan_out: + # we get double the pulled values in the + # ``.subscribe()`` fan out case. + doubled = list(itertools.chain(*zip(expect, expect))) + assert pulled == doubled + + else: + assert pulled == expect else: + # if fan_out: assert pulled == expect[:51] print('trio guest mode task completed!') -def test_basic_interloop_channel_stream(arb_addr): +@pytest.mark.parametrize( + 'fan_out', [False, True], + ids='fan_out_w_chan_subscribe={}'.format +) +def test_basic_interloop_channel_stream(arb_addr, fan_out): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, infect_asyncio=True, + fan_out=fan_out, ) await portal.result() From 333fad8819fd1d9d6b4e89f2c2dbd768d81495cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 15:03:44 -0400 Subject: [PATCH 12/15] Facepalm: join nursery first to avoid channel-closed-too-early --- tests/test_infected_asyncio.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 8fa6eb5..cb0a400 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -256,8 +256,13 @@ async def stream_from_aio( if fan_out: # start second task that get's the same stream value set. async with ( - trio.open_nursery() as n, + + # NOTE: this has to come first to avoid + # the channel being closed before the nursery + # tasks are joined.. chan.subscribe() as br, + + trio.open_nursery() as n, ): n.start_soon(consume, br) await consume(chan) From fa354ffe2b832ed6a988a381080c027e71f143e1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 15:24:16 -0400 Subject: [PATCH 13/15] Handle not all values pulled case --- tests/test_infected_asyncio.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index cb0a400..88fce6d 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -280,7 +280,9 @@ async def stream_from_aio( # we get double the pulled values in the # ``.subscribe()`` fan out case. doubled = list(itertools.chain(*zip(expect, expect))) - assert pulled == doubled + if pulled != doubled[:len(pulled)]: + print(f'uhhh pulled is {pulled}') + assert pulled == doubled[:len(pulled)] else: assert pulled == expect From 597ae4b690f2b2d8fead07194f983ca40ebdef45 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 15:59:33 -0400 Subject: [PATCH 14/15] Add nooz file --- nooz/304.feature.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 nooz/304.feature.rst diff --git a/nooz/304.feature.rst b/nooz/304.feature.rst new file mode 100644 index 0000000..820ecf7 --- /dev/null +++ b/nooz/304.feature.rst @@ -0,0 +1,12 @@ +Add a new ``to_asyncio.LinkedTaskChannel.subscribe()`` which gives +task-oriented broadcast functionality semantically equivalent to +``tractor.MsgStream.subscribe()`` this makes it possible for multiple +``trio``-side tasks to consume ``asyncio``-side task msgs in tandem. + +Further Improvements to the test suite were added in this patch set +including a new scenario test for a sub-actor managed "service nursery" +(implementing the basics of a "service manager") including use of +*infected asyncio* mode. Further we added a lower level +``test_trioisms.py`` to start to track issues we need to work around in +``trio`` itself which in this case included a bug we were trying to +solve related to https://github.com/python-trio/trio/issues/2258. From 9c27858aaf3bd3a8f2d7482e5fe5630ef81c4d81 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Apr 2022 16:13:33 -0400 Subject: [PATCH 15/15] WIP prints to debug frickin windows --- tests/test_infected_asyncio.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 88fce6d..1cf15b5 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -280,14 +280,18 @@ async def stream_from_aio( # we get double the pulled values in the # ``.subscribe()`` fan out case. doubled = list(itertools.chain(*zip(expect, expect))) - if pulled != doubled[:len(pulled)]: - print(f'uhhh pulled is {pulled}') - assert pulled == doubled[:len(pulled)] + expect = doubled[:len(pulled)] + if pulled != expect: + print( + f'uhhh pulled is {pulled}\n', + f'uhhh expect is {expect}\n', + ) + assert pulled == expect else: assert pulled == expect else: - # if fan_out: + assert not fan_out assert pulled == expect[:51] print('trio guest mode task completed!')