diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index d954d79..1c86597 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,7 +20,7 @@ jobs:
- name: Setup python
uses: actions/setup-python@v2
with:
- python-version: '3.9'
+ python-version: '3.10'
- name: Install dependencies
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt
diff --git a/nooz/304.feature.rst b/nooz/304.feature.rst
new file mode 100644
index 0000000..820ecf7
--- /dev/null
+++ b/nooz/304.feature.rst
@@ -0,0 +1,12 @@
+Add a new ``to_asyncio.LinkedTaskChannel.subscribe()`` which gives
+task-oriented broadcast functionality semantically equivalent to
+``tractor.MsgStream.subscribe()`` this makes it possible for multiple
+``trio``-side tasks to consume ``asyncio``-side task msgs in tandem.
+
+Further Improvements to the test suite were added in this patch set
+including a new scenario test for a sub-actor managed "service nursery"
+(implementing the basics of a "service manager") including use of
+*infected asyncio* mode. Further we added a lower level
+``test_trioisms.py`` to start to track issues we need to work around in
+``trio`` itself which in this case included a bug we were trying to
+solve related to https://github.com/python-trio/trio/issues/2258.
diff --git a/setup.py b/setup.py
index dd00715..3aa1b96 100755
--- a/setup.py
+++ b/setup.py
@@ -1,21 +1,22 @@
#!/usr/bin/env python
#
-# tractor: a trionic actor model built on `multiprocessing` and `trio`
+# tractor: structured concurrent "actors".
#
-# Copyright (C) 2018-2020 Tyler Goodlet
+# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
+# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
from setuptools import setup
with open('docs/README.rst', encoding='utf-8') as f:
@@ -27,7 +28,7 @@ setup(
version='0.1.0a5.dev', # alpha zone
description='structured concurrrent "actors"',
long_description=readme,
- license='GPLv3',
+ license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='jgbt@protonmail.com',
@@ -80,7 +81,7 @@ setup(
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Framework :: Trio",
- "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
+ "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py
new file mode 100644
index 0000000..806e6d7
--- /dev/null
+++ b/tests/test_child_manages_service_nursery.py
@@ -0,0 +1,173 @@
+'''
+Test a service style daemon that maintains a nursery for spawning
+"remote async tasks" including both spawning other long living
+sub-sub-actor daemons.
+
+'''
+from typing import Optional
+import asyncio
+from contextlib import asynccontextmanager as acm
+
+import pytest
+import trio
+from trio_typing import TaskStatus
+import tractor
+from tractor import RemoteActorError
+from async_generator import aclosing
+
+
+async def aio_streamer(
+ from_trio: asyncio.Queue,
+ to_trio: trio.abc.SendChannel,
+) -> trio.abc.ReceiveChannel:
+
+ # required first msg to sync caller
+ to_trio.send_nowait(None)
+
+ from itertools import cycle
+ for i in cycle(range(10)):
+ to_trio.send_nowait(i)
+ await asyncio.sleep(0.01)
+
+
+async def trio_streamer():
+ from itertools import cycle
+ for i in cycle(range(10)):
+ yield i
+ await trio.sleep(0.01)
+
+
+async def trio_sleep_and_err(delay: float = 0.5):
+ await trio.sleep(delay)
+ # name error
+ doggy() # noqa
+
+
+_cached_stream: Optional[
+ trio.abc.ReceiveChannel
+] = None
+
+
+@acm
+async def wrapper_mngr(
+):
+ from tractor.trionics import broadcast_receiver
+ global _cached_stream
+ in_aio = tractor.current_actor().is_infected_aio()
+
+ if in_aio:
+ if _cached_stream:
+
+ from_aio = _cached_stream
+
+ # if we already have a cached feed deliver a rx side clone
+ # to consumer
+ async with broadcast_receiver(from_aio, 6) as from_aio:
+ yield from_aio
+ return
+ else:
+ async with tractor.to_asyncio.open_channel_from(
+ aio_streamer,
+ ) as (first, from_aio):
+ assert not first
+
+ # cache it so next task uses broadcast receiver
+ _cached_stream = from_aio
+
+ yield from_aio
+ else:
+ async with aclosing(trio_streamer()) as stream:
+ # cache it so next task uses broadcast receiver
+ _cached_stream = stream
+ yield stream
+
+
+_nursery: trio.Nursery = None
+
+
+@tractor.context
+async def trio_main(
+ ctx: tractor.Context,
+):
+ # sync
+ await ctx.started()
+
+ # stash a "service nursery" as "actor local" (aka a Python global)
+ global _nursery
+ n = _nursery
+ assert n
+
+ async def consume_stream():
+ async with wrapper_mngr() as stream:
+ async for msg in stream:
+ print(msg)
+
+ # run 2 tasks to ensure broadcaster chan use
+ n.start_soon(consume_stream)
+ n.start_soon(consume_stream)
+
+ n.start_soon(trio_sleep_and_err)
+
+ await trio.sleep_forever()
+
+
+@tractor.context
+async def open_actor_local_nursery(
+ ctx: tractor.Context,
+):
+ global _nursery
+ async with trio.open_nursery() as n:
+ _nursery = n
+ await ctx.started()
+ await trio.sleep(10)
+ # await trio.sleep(1)
+
+ # XXX: this causes the hang since
+ # the caller does not unblock from its own
+ # ``trio.sleep_forever()``.
+
+ # TODO: we need to test a simple ctx task starting remote tasks
+ # that error and then blocking on a ``Nursery.start()`` which
+ # never yields back.. aka a scenario where the
+ # ``tractor.context`` task IS NOT in the service n's cancel
+ # scope.
+ n.cancel_scope.cancel()
+
+
+@pytest.mark.parametrize(
+ 'asyncio_mode',
+ [True, False],
+ ids='asyncio_mode={}'.format,
+)
+def test_actor_managed_trio_nursery_task_error_cancels_aio(
+ asyncio_mode: bool,
+ arb_addr
+):
+ '''
+ Verify that a ``trio`` nursery created managed in a child actor
+ correctly relays errors to the parent actor when one of its spawned
+ tasks errors even when running in infected asyncio mode and using
+ broadcast receivers for multi-task-per-actor subscription.
+
+ '''
+ async def main():
+
+ # cancel the nursery shortly after boot
+ async with tractor.open_nursery() as n:
+ p = await n.start_actor(
+ 'nursery_mngr',
+ infect_asyncio=asyncio_mode,
+ enable_modules=[__name__],
+ )
+ async with (
+ p.open_context(open_actor_local_nursery) as (ctx, first),
+ p.open_context(trio_main) as (ctx, first),
+ ):
+ await trio.sleep_forever()
+
+ with pytest.raises(RemoteActorError) as excinfo:
+ trio.run(main)
+
+ # verify boxed error
+ err = excinfo.value
+ assert isinstance(err.type(), NameError)
diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py
index 9881be9..1cf15b5 100644
--- a/tests/test_infected_asyncio.py
+++ b/tests/test_infected_asyncio.py
@@ -2,9 +2,10 @@
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
'''
-from typing import Optional, Iterable
+from typing import Optional, Iterable, Union
import asyncio
import builtins
+import itertools
import importlib
import pytest
@@ -12,10 +13,11 @@ import trio
import tractor
from tractor import to_asyncio
from tractor import RemoteActorError
+from tractor.trionics import BroadcastReceiver
-async def sleep_and_err():
- await asyncio.sleep(0.1)
+async def sleep_and_err(sleep_for: float = 0.1):
+ await asyncio.sleep(sleep_for)
assert 0
@@ -217,6 +219,7 @@ async def stream_from_aio(
exit_early: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False,
+ fan_out: bool = False,
) -> None:
seq = range(100)
@@ -234,15 +237,38 @@ async def stream_from_aio(
assert first is True
- async for value in chan:
- print(f'trio received {value}')
- pulled.append(value)
+ async def consume(
+ chan: Union[
+ to_asyncio.LinkedTaskChannel,
+ BroadcastReceiver,
+ ],
+ ):
+ async for value in chan:
+ print(f'trio received {value}')
+ pulled.append(value)
- if value == 50:
- if raise_err:
- raise Exception
- elif exit_early:
- break
+ if value == 50:
+ if raise_err:
+ raise Exception
+ elif exit_early:
+ break
+
+ if fan_out:
+ # start second task that get's the same stream value set.
+ async with (
+
+ # NOTE: this has to come first to avoid
+ # the channel being closed before the nursery
+ # tasks are joined..
+ chan.subscribe() as br,
+
+ trio.open_nursery() as n,
+ ):
+ n.start_soon(consume, br)
+ await consume(chan)
+
+ else:
+ await consume(chan)
finally:
if (
@@ -250,19 +276,38 @@ async def stream_from_aio(
not exit_early and
not aio_raise_err
):
- assert pulled == expect
+ if fan_out:
+ # we get double the pulled values in the
+ # ``.subscribe()`` fan out case.
+ doubled = list(itertools.chain(*zip(expect, expect)))
+ expect = doubled[:len(pulled)]
+ if pulled != expect:
+ print(
+ f'uhhh pulled is {pulled}\n',
+ f'uhhh expect is {expect}\n',
+ )
+ assert pulled == expect
+
+ else:
+ assert pulled == expect
else:
+ assert not fan_out
assert pulled == expect[:51]
print('trio guest mode task completed!')
-def test_basic_interloop_channel_stream(arb_addr):
+@pytest.mark.parametrize(
+ 'fan_out', [False, True],
+ ids='fan_out_w_chan_subscribe={}'.format
+)
+def test_basic_interloop_channel_stream(arb_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
infect_asyncio=True,
+ fan_out=fan_out,
)
await portal.result()
diff --git a/tests/test_trioisms.py b/tests/test_trioisms.py
new file mode 100644
index 0000000..5b19f50
--- /dev/null
+++ b/tests/test_trioisms.py
@@ -0,0 +1,82 @@
+'''
+Reminders for oddities in `trio` that we need to stay aware of and/or
+want to see changed.
+
+'''
+import pytest
+import trio
+from trio_typing import TaskStatus
+
+
+@pytest.mark.parametrize(
+ 'use_start_soon', [
+ pytest.param(
+ True,
+ marks=pytest.mark.xfail(reason="see python-trio/trio#2258")
+ ),
+ False,
+ ]
+)
+def test_stashed_child_nursery(use_start_soon):
+
+ _child_nursery = None
+
+ async def waits_on_signal(
+ ev: trio.Event(),
+ task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
+ ):
+ '''
+ Do some stuf, then signal other tasks, then yield back to "starter".
+
+ '''
+ await ev.wait()
+ task_status.started()
+
+ async def mk_child_nursery(
+ task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
+ ):
+ '''
+ Allocate a child sub-nursery and stash it as a global.
+
+ '''
+ nonlocal _child_nursery
+
+ async with trio.open_nursery() as cn:
+ _child_nursery = cn
+ task_status.started(cn)
+
+ # block until cancelled by parent.
+ await trio.sleep_forever()
+
+ async def sleep_and_err(
+ ev: trio.Event,
+ task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
+ ):
+ await trio.sleep(0.5)
+ doggy() # noqa
+ ev.set()
+ task_status.started()
+
+ async def main():
+
+ async with (
+ trio.open_nursery() as pn,
+ ):
+ cn = await pn.start(mk_child_nursery)
+ assert cn
+
+ ev = trio.Event()
+
+ if use_start_soon:
+ # this causes inf hang
+ cn.start_soon(sleep_and_err, ev)
+
+ else:
+ # this does not.
+ await cn.start(sleep_and_err, ev)
+
+ with trio.fail_after(1):
+ await cn.start(waits_on_signal, ev)
+
+ with pytest.raises(NameError):
+ trio.run(main)
diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py
index 9b18a87..6ca07ca 100644
--- a/tractor/to_asyncio.py
+++ b/tractor/to_asyncio.py
@@ -23,6 +23,7 @@ from asyncio.exceptions import CancelledError
from contextlib import asynccontextmanager as acm
from dataclasses import dataclass
import inspect
+import traceback
from typing import (
Any,
Callable,
@@ -32,10 +33,15 @@ from typing import (
)
import trio
+from outcome import Error
from .log import get_logger
from ._state import current_actor
from ._exceptions import AsyncioCancelled
+from .trionics._broadcast import (
+ broadcast_receiver,
+ BroadcastReceiver,
+)
log = get_logger(__name__)
@@ -61,15 +67,24 @@ class LinkedTaskChannel(trio.abc.Channel):
# set after ``asyncio.create_task()``
_aio_task: Optional[asyncio.Task] = None
_aio_err: Optional[BaseException] = None
+ _broadcaster: Optional[BroadcastReceiver] = None
async def aclose(self) -> None:
await self._from_aio.aclose()
async def receive(self) -> Any:
async with translate_aio_errors(self):
+
+ # TODO: do we need this to guarantee asyncio code get's
+ # cancelled in the case where the trio side somehow creates
+ # a state where the asyncio cycle-task isn't getting the
+ # cancel request sent by (in theory) the last checkpoint
+ # cycle on the trio side?
+ # await trio.lowlevel.checkpoint()
+
return await self._from_aio.receive()
- async def wait_ayncio_complete(self) -> None:
+ async def wait_asyncio_complete(self) -> None:
await self._aio_task_complete.wait()
# def cancel_asyncio_task(self) -> None:
@@ -84,6 +99,43 @@ class LinkedTaskChannel(trio.abc.Channel):
'''
self._to_aio.put_nowait(item)
+ def closed(self) -> bool:
+ return self._from_aio._closed # type: ignore
+
+ # TODO: shoud we consider some kind of "decorator" system
+ # that checks for structural-typing compatibliity and then
+ # automatically adds this ctx-mngr-as-method machinery?
+ @acm
+ async def subscribe(
+ self,
+
+ ) -> AsyncIterator[BroadcastReceiver]:
+ '''
+ Allocate and return a ``BroadcastReceiver`` which delegates
+ to this inter-task channel.
+
+ This allows multiple local tasks to receive each their own copy
+ of this message stream.
+
+ See ``tractor._streaming.MsgStream.subscribe()`` for further
+ similar details.
+ '''
+ if self._broadcaster is None:
+
+ bcast = self._broadcaster = broadcast_receiver(
+ self,
+ # use memory channel size by default
+ self._from_aio._state.max_buffer_size, # type: ignore
+ receive_afunc=self.receive,
+ )
+
+ self.receive = bcast.receive # type: ignore
+
+ async with self._broadcaster.subscribe() as bstream:
+ assert bstream.key != self._broadcaster.key
+ assert bstream._recv == self._broadcaster._recv
+ yield bstream
+
def _run_asyncio_task(
@@ -99,6 +151,7 @@ def _run_asyncio_task(
or stream the result back to ``trio``.
'''
+ __tracebackhide__ = True
if not current_actor().is_infected_aio():
raise RuntimeError("`infect_asyncio` mode is not enabled!?")
@@ -157,6 +210,9 @@ def _run_asyncio_task(
orig = result = id(coro)
try:
result = await coro
+ except GeneratorExit:
+ # no need to relay error
+ raise
except BaseException as aio_err:
chan._aio_err = aio_err
raise
@@ -202,20 +258,20 @@ def _run_asyncio_task(
'''
nonlocal chan
aio_err = chan._aio_err
+ task_err: Optional[BaseException] = None
# only to avoid ``asyncio`` complaining about uncaptured
# task exceptions
try:
task.exception()
except BaseException as terr:
+ task_err = terr
+ log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None:
- if type(aio_err) is CancelledError:
- log.cancel("infected task was cancelled")
- else:
- aio_err.with_traceback(aio_err.__traceback__)
- log.exception("infected task errorred:")
+ # XXX: uhh is this true?
+ # assert task_err, f'Asyncio task {task.get_name()} discrepancy!?'
# NOTE: currently mem chan closure may act as a form
# of error relay (at least in the ``asyncio.CancelledError``
@@ -224,8 +280,26 @@ def _run_asyncio_task(
# We might want to change this in the future though.
from_aio.close()
- task.add_done_callback(cancel_trio)
+ if type(aio_err) is CancelledError:
+ log.cancel("infected task was cancelled")
+ # TODO: show that the cancellation originated
+ # from the ``trio`` side? right?
+ # if cancel_scope.cancelled:
+ # raise aio_err from err
+
+ elif task_err is None:
+ assert aio_err
+ aio_err.with_traceback(aio_err.__traceback__)
+ msg = ''.join(traceback.format_exception(type(aio_err)))
+ log.error(
+ f'infected task errorred:\n{msg}'
+ )
+
+ # raise any ``asyncio`` side error.
+ raise aio_err
+
+ task.add_done_callback(cancel_trio)
return chan
@@ -240,6 +314,8 @@ async def translate_aio_errors(
appropriately translates errors and cancels into ``trio`` land.
'''
+ trio_task = trio.lowlevel.current_task()
+
aio_err: Optional[BaseException] = None
def maybe_raise_aio_err(
@@ -260,10 +336,22 @@ async def translate_aio_errors(
assert task
try:
yield
+
+ except (
+ trio.Cancelled,
+ ):
+ # relay cancel through to called ``asyncio`` task
+ assert chan._aio_task
+ chan._aio_task.cancel(
+ msg=f'the `trio` caller task was cancelled: {trio_task.name}'
+ )
+ raise
+
except (
# NOTE: see the note in the ``cancel_trio()`` asyncio task
# termination callback
trio.ClosedResourceError,
+ # trio.BrokenResourceError,
):
aio_err = chan._aio_err
if (
@@ -277,6 +365,7 @@ async def translate_aio_errors(
else:
raise
+
finally:
# always cancel the ``asyncio`` task if we've made it this far
# and it's not done.
@@ -309,7 +398,6 @@ async def run_task(
**kwargs,
)
with chan._from_aio:
- # try:
async with translate_aio_errors(chan):
# return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api to
@@ -343,7 +431,7 @@ async def open_channel_from(
# ``asyncio`` task.
first = await chan.receive()
- # stream values upward
+ # deliver stream handle upward
yield first, chan
@@ -381,8 +469,20 @@ def run_as_asyncio_guest(
def trio_done_callback(main_outcome):
- print(f"trio_main finished: {main_outcome!r}")
- trio_done_fut.set_result(main_outcome)
+ if isinstance(main_outcome, Error):
+ error = main_outcome.error
+ trio_done_fut.set_exception(error)
+
+ # TODO: explicit asyncio tb?
+ # traceback.print_exception(error)
+
+ # XXX: do we need this?
+ # actor.cancel_soon()
+
+ main_outcome.unwrap()
+ else:
+ trio_done_fut.set_result(main_outcome)
+ print(f"trio_main finished: {main_outcome!r}")
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
@@ -392,6 +492,7 @@ def run_as_asyncio_guest(
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
+ # ``.unwrap()`` will raise here on error
return (await trio_done_fut).unwrap()
# might as well if it's installed.