Merge pull request #304 from goodboy/aio_explicit_task_cancels

`LinkedTaskChannel.subscribe()`, explicit `asyncio` task cancel logging, `test_trioisms.py`
sort_subs_results_infected_aio
goodboy 2022-04-12 17:27:29 -04:00 committed by GitHub
commit bfe99f29b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 447 additions and 33 deletions

View File

@ -20,7 +20,7 @@ jobs:
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.9'
python-version: '3.10'
- name: Install dependencies
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt

View File

@ -0,0 +1,12 @@
Add a new ``to_asyncio.LinkedTaskChannel.subscribe()`` which gives
task-oriented broadcast functionality semantically equivalent to
``tractor.MsgStream.subscribe()`` this makes it possible for multiple
``trio``-side tasks to consume ``asyncio``-side task msgs in tandem.
Further Improvements to the test suite were added in this patch set
including a new scenario test for a sub-actor managed "service nursery"
(implementing the basics of a "service manager") including use of
*infected asyncio* mode. Further we added a lower level
``test_trioisms.py`` to start to track issues we need to work around in
``trio`` itself which in this case included a bug we were trying to
solve related to https://github.com/python-trio/trio/issues/2258.

View File

@ -1,21 +1,22 @@
#!/usr/bin/env python
#
# tractor: a trionic actor model built on `multiprocessing` and `trio`
# tractor: structured concurrent "actors".
#
# Copyright (C) 2018-2020 Tyler Goodlet
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from setuptools import setup
with open('docs/README.rst', encoding='utf-8') as f:
@ -27,7 +28,7 @@ setup(
version='0.1.0a5.dev', # alpha zone
description='structured concurrrent "actors"',
long_description=readme,
license='GPLv3',
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='jgbt@protonmail.com',
@ -80,7 +81,7 @@ setup(
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Framework :: Trio",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",

View File

@ -0,0 +1,173 @@
'''
Test a service style daemon that maintains a nursery for spawning
"remote async tasks" including both spawning other long living
sub-sub-actor daemons.
'''
from typing import Optional
import asyncio
from contextlib import asynccontextmanager as acm
import pytest
import trio
from trio_typing import TaskStatus
import tractor
from tractor import RemoteActorError
from async_generator import aclosing
async def aio_streamer(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> trio.abc.ReceiveChannel:
# required first msg to sync caller
to_trio.send_nowait(None)
from itertools import cycle
for i in cycle(range(10)):
to_trio.send_nowait(i)
await asyncio.sleep(0.01)
async def trio_streamer():
from itertools import cycle
for i in cycle(range(10)):
yield i
await trio.sleep(0.01)
async def trio_sleep_and_err(delay: float = 0.5):
await trio.sleep(delay)
# name error
doggy() # noqa
_cached_stream: Optional[
trio.abc.ReceiveChannel
] = None
@acm
async def wrapper_mngr(
):
from tractor.trionics import broadcast_receiver
global _cached_stream
in_aio = tractor.current_actor().is_infected_aio()
if in_aio:
if _cached_stream:
from_aio = _cached_stream
# if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver(from_aio, 6) as from_aio:
yield from_aio
return
else:
async with tractor.to_asyncio.open_channel_from(
aio_streamer,
) as (first, from_aio):
assert not first
# cache it so next task uses broadcast receiver
_cached_stream = from_aio
yield from_aio
else:
async with aclosing(trio_streamer()) as stream:
# cache it so next task uses broadcast receiver
_cached_stream = stream
yield stream
_nursery: trio.Nursery = None
@tractor.context
async def trio_main(
ctx: tractor.Context,
):
# sync
await ctx.started()
# stash a "service nursery" as "actor local" (aka a Python global)
global _nursery
n = _nursery
assert n
async def consume_stream():
async with wrapper_mngr() as stream:
async for msg in stream:
print(msg)
# run 2 tasks to ensure broadcaster chan use
n.start_soon(consume_stream)
n.start_soon(consume_stream)
n.start_soon(trio_sleep_and_err)
await trio.sleep_forever()
@tractor.context
async def open_actor_local_nursery(
ctx: tractor.Context,
):
global _nursery
async with trio.open_nursery() as n:
_nursery = n
await ctx.started()
await trio.sleep(10)
# await trio.sleep(1)
# XXX: this causes the hang since
# the caller does not unblock from its own
# ``trio.sleep_forever()``.
# TODO: we need to test a simple ctx task starting remote tasks
# that error and then blocking on a ``Nursery.start()`` which
# never yields back.. aka a scenario where the
# ``tractor.context`` task IS NOT in the service n's cancel
# scope.
n.cancel_scope.cancel()
@pytest.mark.parametrize(
'asyncio_mode',
[True, False],
ids='asyncio_mode={}'.format,
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
arb_addr
):
'''
Verify that a ``trio`` nursery created managed in a child actor
correctly relays errors to the parent actor when one of its spawned
tasks errors even when running in infected asyncio mode and using
broadcast receivers for multi-task-per-actor subscription.
'''
async def main():
# cancel the nursery shortly after boot
async with tractor.open_nursery() as n:
p = await n.start_actor(
'nursery_mngr',
infect_asyncio=asyncio_mode,
enable_modules=[__name__],
)
async with (
p.open_context(open_actor_local_nursery) as (ctx, first),
p.open_context(trio_main) as (ctx, first),
):
await trio.sleep_forever()
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# verify boxed error
err = excinfo.value
assert isinstance(err.type(), NameError)

View File

@ -2,9 +2,10 @@
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
'''
from typing import Optional, Iterable
from typing import Optional, Iterable, Union
import asyncio
import builtins
import itertools
import importlib
import pytest
@ -12,10 +13,11 @@ import trio
import tractor
from tractor import to_asyncio
from tractor import RemoteActorError
from tractor.trionics import BroadcastReceiver
async def sleep_and_err():
await asyncio.sleep(0.1)
async def sleep_and_err(sleep_for: float = 0.1):
await asyncio.sleep(sleep_for)
assert 0
@ -217,6 +219,7 @@ async def stream_from_aio(
exit_early: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False,
fan_out: bool = False,
) -> None:
seq = range(100)
@ -234,6 +237,12 @@ async def stream_from_aio(
assert first is True
async def consume(
chan: Union[
to_asyncio.LinkedTaskChannel,
BroadcastReceiver,
],
):
async for value in chan:
print(f'trio received {value}')
pulled.append(value)
@ -243,6 +252,23 @@ async def stream_from_aio(
raise Exception
elif exit_early:
break
if fan_out:
# start second task that get's the same stream value set.
async with (
# NOTE: this has to come first to avoid
# the channel being closed before the nursery
# tasks are joined..
chan.subscribe() as br,
trio.open_nursery() as n,
):
n.start_soon(consume, br)
await consume(chan)
else:
await consume(chan)
finally:
if (
@ -250,19 +276,38 @@ async def stream_from_aio(
not exit_early and
not aio_raise_err
):
if fan_out:
# we get double the pulled values in the
# ``.subscribe()`` fan out case.
doubled = list(itertools.chain(*zip(expect, expect)))
expect = doubled[:len(pulled)]
if pulled != expect:
print(
f'uhhh pulled is {pulled}\n',
f'uhhh expect is {expect}\n',
)
assert pulled == expect
else:
assert pulled == expect
else:
assert not fan_out
assert pulled == expect[:51]
print('trio guest mode task completed!')
def test_basic_interloop_channel_stream(arb_addr):
@pytest.mark.parametrize(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(arb_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
infect_asyncio=True,
fan_out=fan_out,
)
await portal.result()

View File

@ -0,0 +1,82 @@
'''
Reminders for oddities in `trio` that we need to stay aware of and/or
want to see changed.
'''
import pytest
import trio
from trio_typing import TaskStatus
@pytest.mark.parametrize(
'use_start_soon', [
pytest.param(
True,
marks=pytest.mark.xfail(reason="see python-trio/trio#2258")
),
False,
]
)
def test_stashed_child_nursery(use_start_soon):
_child_nursery = None
async def waits_on_signal(
ev: trio.Event(),
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
):
'''
Do some stuf, then signal other tasks, then yield back to "starter".
'''
await ev.wait()
task_status.started()
async def mk_child_nursery(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
):
'''
Allocate a child sub-nursery and stash it as a global.
'''
nonlocal _child_nursery
async with trio.open_nursery() as cn:
_child_nursery = cn
task_status.started(cn)
# block until cancelled by parent.
await trio.sleep_forever()
async def sleep_and_err(
ev: trio.Event,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
):
await trio.sleep(0.5)
doggy() # noqa
ev.set()
task_status.started()
async def main():
async with (
trio.open_nursery() as pn,
):
cn = await pn.start(mk_child_nursery)
assert cn
ev = trio.Event()
if use_start_soon:
# this causes inf hang
cn.start_soon(sleep_and_err, ev)
else:
# this does not.
await cn.start(sleep_and_err, ev)
with trio.fail_after(1):
await cn.start(waits_on_signal, ev)
with pytest.raises(NameError):
trio.run(main)

View File

@ -23,6 +23,7 @@ from asyncio.exceptions import CancelledError
from contextlib import asynccontextmanager as acm
from dataclasses import dataclass
import inspect
import traceback
from typing import (
Any,
Callable,
@ -32,10 +33,15 @@ from typing import (
)
import trio
from outcome import Error
from .log import get_logger
from ._state import current_actor
from ._exceptions import AsyncioCancelled
from .trionics._broadcast import (
broadcast_receiver,
BroadcastReceiver,
)
log = get_logger(__name__)
@ -61,15 +67,24 @@ class LinkedTaskChannel(trio.abc.Channel):
# set after ``asyncio.create_task()``
_aio_task: Optional[asyncio.Task] = None
_aio_err: Optional[BaseException] = None
_broadcaster: Optional[BroadcastReceiver] = None
async def aclose(self) -> None:
await self._from_aio.aclose()
async def receive(self) -> Any:
async with translate_aio_errors(self):
# TODO: do we need this to guarantee asyncio code get's
# cancelled in the case where the trio side somehow creates
# a state where the asyncio cycle-task isn't getting the
# cancel request sent by (in theory) the last checkpoint
# cycle on the trio side?
# await trio.lowlevel.checkpoint()
return await self._from_aio.receive()
async def wait_ayncio_complete(self) -> None:
async def wait_asyncio_complete(self) -> None:
await self._aio_task_complete.wait()
# def cancel_asyncio_task(self) -> None:
@ -84,6 +99,43 @@ class LinkedTaskChannel(trio.abc.Channel):
'''
self._to_aio.put_nowait(item)
def closed(self) -> bool:
return self._from_aio._closed # type: ignore
# TODO: shoud we consider some kind of "decorator" system
# that checks for structural-typing compatibliity and then
# automatically adds this ctx-mngr-as-method machinery?
@acm
async def subscribe(
self,
) -> AsyncIterator[BroadcastReceiver]:
'''
Allocate and return a ``BroadcastReceiver`` which delegates
to this inter-task channel.
This allows multiple local tasks to receive each their own copy
of this message stream.
See ``tractor._streaming.MsgStream.subscribe()`` for further
similar details.
'''
if self._broadcaster is None:
bcast = self._broadcaster = broadcast_receiver(
self,
# use memory channel size by default
self._from_aio._state.max_buffer_size, # type: ignore
receive_afunc=self.receive,
)
self.receive = bcast.receive # type: ignore
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
yield bstream
def _run_asyncio_task(
@ -99,6 +151,7 @@ def _run_asyncio_task(
or stream the result back to ``trio``.
'''
__tracebackhide__ = True
if not current_actor().is_infected_aio():
raise RuntimeError("`infect_asyncio` mode is not enabled!?")
@ -157,6 +210,9 @@ def _run_asyncio_task(
orig = result = id(coro)
try:
result = await coro
except GeneratorExit:
# no need to relay error
raise
except BaseException as aio_err:
chan._aio_err = aio_err
raise
@ -202,20 +258,20 @@ def _run_asyncio_task(
'''
nonlocal chan
aio_err = chan._aio_err
task_err: Optional[BaseException] = None
# only to avoid ``asyncio`` complaining about uncaptured
# task exceptions
try:
task.exception()
except BaseException as terr:
task_err = terr
log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None:
if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled")
else:
aio_err.with_traceback(aio_err.__traceback__)
log.exception("infected task errorred:")
# XXX: uhh is this true?
# assert task_err, f'Asyncio task {task.get_name()} discrepancy!?'
# NOTE: currently mem chan closure may act as a form
# of error relay (at least in the ``asyncio.CancelledError``
@ -224,8 +280,26 @@ def _run_asyncio_task(
# We might want to change this in the future though.
from_aio.close()
task.add_done_callback(cancel_trio)
if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled")
# TODO: show that the cancellation originated
# from the ``trio`` side? right?
# if cancel_scope.cancelled:
# raise aio_err from err
elif task_err is None:
assert aio_err
aio_err.with_traceback(aio_err.__traceback__)
msg = ''.join(traceback.format_exception(type(aio_err)))
log.error(
f'infected task errorred:\n{msg}'
)
# raise any ``asyncio`` side error.
raise aio_err
task.add_done_callback(cancel_trio)
return chan
@ -240,6 +314,8 @@ async def translate_aio_errors(
appropriately translates errors and cancels into ``trio`` land.
'''
trio_task = trio.lowlevel.current_task()
aio_err: Optional[BaseException] = None
def maybe_raise_aio_err(
@ -260,10 +336,22 @@ async def translate_aio_errors(
assert task
try:
yield
except (
trio.Cancelled,
):
# relay cancel through to called ``asyncio`` task
assert chan._aio_task
chan._aio_task.cancel(
msg=f'the `trio` caller task was cancelled: {trio_task.name}'
)
raise
except (
# NOTE: see the note in the ``cancel_trio()`` asyncio task
# termination callback
trio.ClosedResourceError,
# trio.BrokenResourceError,
):
aio_err = chan._aio_err
if (
@ -277,6 +365,7 @@ async def translate_aio_errors(
else:
raise
finally:
# always cancel the ``asyncio`` task if we've made it this far
# and it's not done.
@ -309,7 +398,6 @@ async def run_task(
**kwargs,
)
with chan._from_aio:
# try:
async with translate_aio_errors(chan):
# return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api to
@ -343,7 +431,7 @@ async def open_channel_from(
# ``asyncio`` task.
first = await chan.receive()
# stream values upward
# deliver stream handle upward
yield first, chan
@ -381,8 +469,20 @@ def run_as_asyncio_guest(
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
if isinstance(main_outcome, Error):
error = main_outcome.error
trio_done_fut.set_exception(error)
# TODO: explicit asyncio tb?
# traceback.print_exception(error)
# XXX: do we need this?
# actor.cancel_soon()
main_outcome.unwrap()
else:
trio_done_fut.set_result(main_outcome)
print(f"trio_main finished: {main_outcome!r}")
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
@ -392,6 +492,7 @@ def run_as_asyncio_guest(
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
# ``.unwrap()`` will raise here on error
return (await trio_done_fut).unwrap()
# might as well if it's installed.