Compare commits
4 Commits
4c82b6e94f
...
7fac170f8d
Author | SHA1 | Date |
---|---|---|
|
7fac170f8d | |
|
44281268a8 | |
|
064c5ce034 | |
|
6637473b54 |
|
@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
|
|||
from tractor._testing import expect_ctxc
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='module',
|
||||
)
|
||||
def delay(debug_mode: bool) -> int:
|
||||
if debug_mode:
|
||||
return 999
|
||||
else:
|
||||
return 1
|
||||
|
||||
|
||||
async def sleep_and_err(
|
||||
sleep_for: float = 0.1,
|
||||
|
||||
|
@ -59,19 +69,23 @@ async def trio_cancels_single_aio_task():
|
|||
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||
|
||||
|
||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||
def test_trio_cancels_aio_on_actor_side(
|
||||
reg_addr: tuple[str, int],
|
||||
delay: int,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Spawn an infected actor that is cancelled by the ``trio`` side
|
||||
task using std cancel scope apis.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
# with trio.fail_after(1):
|
||||
with trio.fail_after(999):
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr]
|
||||
) as n:
|
||||
await n.run_in_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
await an.run_in_actor(
|
||||
trio_cancels_single_aio_task,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
|
@ -118,7 +132,10 @@ async def asyncio_actor(
|
|||
raise
|
||||
|
||||
|
||||
def test_aio_simple_error(reg_addr):
|
||||
def test_aio_simple_error(
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify a simple remote asyncio error propagates back through trio
|
||||
to the parent actor.
|
||||
|
@ -127,9 +144,10 @@ def test_aio_simple_error(reg_addr):
|
|||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr]
|
||||
) as n:
|
||||
await n.run_in_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
await an.run_in_actor(
|
||||
asyncio_actor,
|
||||
target='sleep_and_err',
|
||||
expect_err='AssertionError',
|
||||
|
@ -155,14 +173,19 @@ def test_aio_simple_error(reg_addr):
|
|||
assert err.boxed_type is AssertionError
|
||||
|
||||
|
||||
def test_tractor_cancels_aio(reg_addr):
|
||||
def test_tractor_cancels_aio(
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify we can cancel a spawned asyncio task gracefully.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
asyncio_actor,
|
||||
target='aio_sleep_forever',
|
||||
expect_err='trio.Cancelled',
|
||||
|
@ -174,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def test_trio_cancels_aio(reg_addr):
|
||||
def test_trio_cancels_aio(
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
'''
|
||||
Much like the above test with ``tractor.Portal.cancel_actor()``
|
||||
except we just use a standard ``trio`` cancellation api.
|
||||
|
@ -205,7 +230,8 @@ async def trio_ctx(
|
|||
|
||||
# this will block until the ``asyncio`` task sends a "first"
|
||||
# message.
|
||||
with trio.fail_after(2):
|
||||
delay: int = 999 if tractor.debug_mode() else 1
|
||||
with trio.fail_after(1 + delay):
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
|
@ -241,8 +267,10 @@ async def trio_ctx(
|
|||
ids='parent_actor_cancels_child={}'.format
|
||||
)
|
||||
def test_context_spawns_aio_task_that_errors(
|
||||
reg_addr,
|
||||
reg_addr: tuple[str, int],
|
||||
delay: int,
|
||||
parent_cancels: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that spawning a task via an intertask channel ctx mngr that
|
||||
|
@ -251,14 +279,13 @@ def test_context_spawns_aio_task_that_errors(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
# with trio.fail_after(999):
|
||||
async with tractor.open_nursery() as n:
|
||||
p = await n.start_actor(
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery() as an:
|
||||
p = await an.start_actor(
|
||||
'aio_daemon',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
# debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
loglevel='cancel',
|
||||
)
|
||||
async with (
|
||||
|
@ -325,11 +352,12 @@ async def aio_cancel():
|
|||
|
||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||
reg_addr: tuple,
|
||||
delay: int,
|
||||
):
|
||||
'''
|
||||
When the `asyncio.Task` cancels itself the `trio` side cshould
|
||||
When the `asyncio.Task` cancels itself the `trio` side should
|
||||
also cancel and teardown and relay the cancellation cross-process
|
||||
to the caller (parent).
|
||||
to the parent caller.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
@ -345,8 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
|||
# NOTE: normally the `an.__aexit__()` waits on the
|
||||
# portal's result but we do it explicitly here
|
||||
# to avoid indent levels.
|
||||
# with trio.fail_after(1):
|
||||
with trio.fail_after(999):
|
||||
with trio.fail_after(1 + delay):
|
||||
await p.wait_for_result()
|
||||
|
||||
with pytest.raises(
|
||||
|
@ -397,6 +424,7 @@ async def push_from_aio_task(
|
|||
i == 50
|
||||
):
|
||||
if fail_early:
|
||||
print('Raising exc from aio side!')
|
||||
raise Exception
|
||||
|
||||
if exit_early:
|
||||
|
@ -425,7 +453,7 @@ async def push_from_aio_task(
|
|||
|
||||
async def stream_from_aio(
|
||||
trio_exit_early: bool = False,
|
||||
raise_err: bool = False,
|
||||
trio_raise_err: bool = False,
|
||||
aio_raise_err: bool = False,
|
||||
aio_exit_early: bool = False,
|
||||
fan_out: bool = False,
|
||||
|
@ -440,10 +468,18 @@ async def stream_from_aio(
|
|||
async with to_asyncio.open_channel_from(
|
||||
push_from_aio_task,
|
||||
sequence=seq,
|
||||
expect_cancel=raise_err or trio_exit_early,
|
||||
expect_cancel=trio_raise_err or trio_exit_early,
|
||||
fail_early=aio_raise_err,
|
||||
exit_early=aio_exit_early,
|
||||
|
||||
# such that we can test exit early cases
|
||||
# for each side explicitly.
|
||||
suppress_graceful_exits=(not(
|
||||
aio_exit_early
|
||||
or
|
||||
trio_exit_early
|
||||
))
|
||||
|
||||
) as (first, chan):
|
||||
|
||||
assert first is True
|
||||
|
@ -459,7 +495,7 @@ async def stream_from_aio(
|
|||
pulled.append(value)
|
||||
|
||||
if value == 50:
|
||||
if raise_err:
|
||||
if trio_raise_err:
|
||||
raise Exception
|
||||
elif trio_exit_early:
|
||||
print('`consume()` breaking early!\n')
|
||||
|
@ -478,11 +514,11 @@ async def stream_from_aio(
|
|||
# tasks are joined..
|
||||
chan.subscribe() as br,
|
||||
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# start 2nd task that get's broadcast the same
|
||||
# value set.
|
||||
n.start_soon(consume, br)
|
||||
tn.start_soon(consume, br)
|
||||
await consume(chan)
|
||||
|
||||
else:
|
||||
|
@ -495,14 +531,14 @@ async def stream_from_aio(
|
|||
|
||||
finally:
|
||||
|
||||
if (
|
||||
not raise_err
|
||||
and
|
||||
not trio_exit_early
|
||||
and
|
||||
not aio_raise_err
|
||||
and
|
||||
not aio_exit_early
|
||||
if not (
|
||||
trio_raise_err
|
||||
or
|
||||
trio_exit_early
|
||||
or
|
||||
aio_raise_err
|
||||
or
|
||||
aio_exit_early
|
||||
):
|
||||
if fan_out:
|
||||
# we get double the pulled values in the
|
||||
|
@ -526,10 +562,13 @@ async def stream_from_aio(
|
|||
'fan_out', [False, True],
|
||||
ids='fan_out_w_chan_subscribe={}'.format
|
||||
)
|
||||
def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||
def test_basic_interloop_channel_stream(
|
||||
reg_addr: tuple[str, int],
|
||||
fan_out: bool,
|
||||
):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
infect_asyncio=True,
|
||||
fan_out=fan_out,
|
||||
|
@ -543,10 +582,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
|||
# TODO: parametrize the above test and avoid the duplication here?
|
||||
def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
raise_err=True,
|
||||
trio_raise_err=True,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
# should trigger remote actor error
|
||||
|
@ -561,6 +600,8 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
|||
|
||||
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||
reg_addr: tuple[str, int],
|
||||
delay: int,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Check that if the `trio`-task "exits early and silently" (in this
|
||||
|
@ -572,13 +613,12 @@ def test_trio_closes_early_causes_aio_checkpoint_raise(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
# with trio.fail_after(999):
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery(
|
||||
# debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
# enable_stack_on_sig=True,
|
||||
) as n:
|
||||
portal = await n.run_in_actor(
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
trio_exit_early=True,
|
||||
infect_asyncio=True,
|
||||
|
@ -606,6 +646,8 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
|
|||
# - trio cancelled AND aio exits early on its next tick
|
||||
# - trio errors AND aio exits early on its next tick
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
delay: int,
|
||||
):
|
||||
'''
|
||||
Check that if the `asyncio`-task "exits early and silently" (in this
|
||||
|
@ -621,10 +663,9 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
# with trio.fail_after(999):
|
||||
with trio.fail_after(1 + delay):
|
||||
async with tractor.open_nursery(
|
||||
# debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
# enable_stack_on_sig=True,
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
|
@ -658,10 +699,15 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
|
|||
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
||||
|
||||
|
||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||
def test_aio_errors_and_channel_propagates_and_closes(
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
):
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.run_in_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.run_in_actor(
|
||||
stream_from_aio,
|
||||
aio_raise_err=True,
|
||||
infect_asyncio=True,
|
||||
|
@ -736,13 +782,15 @@ async def trio_to_aio_echo_server(
|
|||
ids='raise_error={}'.format,
|
||||
)
|
||||
def test_echoserver_detailed_mechanics(
|
||||
reg_addr,
|
||||
reg_addr: tuple[str, int],
|
||||
debug_mode: bool,
|
||||
raise_error_mid_stream,
|
||||
):
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
p = await n.start_actor(
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
p = await an.start_actor(
|
||||
'aio_server',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
|
@ -947,6 +995,8 @@ def test_sigint_closes_lifetime_stack(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
delay = 999 if tractor.debug_mode() else 1
|
||||
try:
|
||||
an: tractor.ActorNursery
|
||||
async with tractor.open_nursery(
|
||||
|
@ -997,7 +1047,7 @@ def test_sigint_closes_lifetime_stack(
|
|||
if wait_for_ctx:
|
||||
print('waiting for ctx outcome in parent..')
|
||||
try:
|
||||
with trio.fail_after(1):
|
||||
with trio.fail_after(1 + delay):
|
||||
await ctx.wait_for_result()
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
assert ctxc.canceller == ctx.chan.uid
|
||||
|
|
|
@ -43,6 +43,7 @@ from ._state import (
|
|||
current_actor as current_actor,
|
||||
is_root_process as is_root_process,
|
||||
current_ipc_ctx as current_ipc_ctx,
|
||||
debug_mode as debug_mode
|
||||
)
|
||||
from ._exceptions import (
|
||||
ContextCancelled as ContextCancelled,
|
||||
|
@ -65,3 +66,4 @@ from ._root import (
|
|||
from ._ipc import Channel as Channel
|
||||
from ._portal import Portal as Portal
|
||||
from ._runtime import Actor as Actor
|
||||
from . import hilevel as hilevel
|
||||
|
|
|
@ -103,7 +103,16 @@ class AsyncioTaskExited(Exception):
|
|||
|
||||
'''
|
||||
|
||||
class TrioTaskExited(AsyncioCancelled):
|
||||
class TrioCancelled(Exception):
|
||||
'''
|
||||
Trio cancelled translation (non-base) error
|
||||
for use with the `to_asyncio` module
|
||||
to be raised in the `asyncio.Task` to indicate
|
||||
that the `trio` side raised `Cancelled` or an error.
|
||||
|
||||
'''
|
||||
|
||||
class TrioTaskExited(Exception):
|
||||
'''
|
||||
The `trio`-side task exited without explicitly cancelling the
|
||||
`asyncio.Task` peer.
|
||||
|
@ -406,6 +415,9 @@ class RemoteActorError(Exception):
|
|||
String-name of the (last hop's) boxed error type.
|
||||
|
||||
'''
|
||||
# TODO, maybe support also serializing the
|
||||
# `ExceptionGroup.exeptions: list[BaseException]` set under
|
||||
# certain conditions?
|
||||
bt: Type[BaseException] = self.boxed_type
|
||||
if bt:
|
||||
return str(bt.__name__)
|
||||
|
@ -821,8 +833,11 @@ class MsgTypeError(
|
|||
'''
|
||||
if (
|
||||
(_bad_msg := self.msgdata.get('_bad_msg'))
|
||||
and
|
||||
isinstance(_bad_msg, PayloadMsg)
|
||||
and (
|
||||
isinstance(_bad_msg, PayloadMsg)
|
||||
or
|
||||
isinstance(_bad_msg, msgtypes.Start)
|
||||
)
|
||||
):
|
||||
return _bad_msg
|
||||
|
||||
|
|
|
@ -116,6 +116,8 @@ class LinkedTaskChannel(
|
|||
_trio_task: trio.Task
|
||||
_aio_task_complete: trio.Event
|
||||
|
||||
_suppress_graceful_exits: bool = True
|
||||
|
||||
_trio_err: BaseException|None = None
|
||||
_trio_to_raise: (
|
||||
AsyncioTaskExited| # aio task exits while trio ongoing
|
||||
|
@ -229,6 +231,8 @@ class LinkedTaskChannel(
|
|||
except BaseException as err:
|
||||
async with translate_aio_errors(
|
||||
chan=self,
|
||||
# NOTE, determined by `open_channel_from()` input arg
|
||||
suppress_graceful_exits=self._suppress_graceful_exits,
|
||||
|
||||
# XXX: obviously this will deadlock if an on-going stream is
|
||||
# being procesed.
|
||||
|
@ -300,6 +304,7 @@ def _run_asyncio_task(
|
|||
*,
|
||||
qsize: int = 1,
|
||||
provide_channels: bool = False,
|
||||
suppress_graceful_exits: bool = True,
|
||||
hide_tb: bool = False,
|
||||
**kwargs,
|
||||
|
||||
|
@ -352,6 +357,7 @@ def _run_asyncio_task(
|
|||
_trio_cs=trio_cs,
|
||||
_trio_task=trio_task,
|
||||
_aio_task_complete=aio_task_complete,
|
||||
_suppress_graceful_exits=suppress_graceful_exits,
|
||||
)
|
||||
|
||||
async def wait_on_coro_final_result(
|
||||
|
@ -365,7 +371,6 @@ def _run_asyncio_task(
|
|||
`return`-ed result back to `trio`.
|
||||
|
||||
'''
|
||||
nonlocal aio_err
|
||||
nonlocal chan
|
||||
|
||||
orig = result = id(coro)
|
||||
|
@ -383,7 +388,6 @@ def _run_asyncio_task(
|
|||
'`asyncio` task errored\n'
|
||||
)
|
||||
raise
|
||||
|
||||
else:
|
||||
if (
|
||||
result != orig
|
||||
|
@ -419,7 +423,11 @@ def _run_asyncio_task(
|
|||
# - raise error and aio side errors "independently"
|
||||
# on next tick (SEE draft HANDLER BELOW).
|
||||
stats: trio.MemoryChannelStatistics = to_trio.statistics()
|
||||
if stats.tasks_waiting_receive:
|
||||
if (
|
||||
stats.tasks_waiting_receive
|
||||
and
|
||||
not chan._aio_err
|
||||
):
|
||||
chan._trio_to_raise = AsyncioTaskExited(
|
||||
f'Task existed with final result\n'
|
||||
f'{result!r}\n'
|
||||
|
@ -513,6 +521,9 @@ def _run_asyncio_task(
|
|||
f')> {res}\n'
|
||||
f' |_{task}\n'
|
||||
)
|
||||
if not chan._aio_result:
|
||||
chan._aio_result = res
|
||||
|
||||
# ?TODO, should we also raise `AsyncioTaskExited[res]`
|
||||
# in any case where trio is NOT blocking on the
|
||||
# `._to_trio` chan?
|
||||
|
@ -593,6 +604,7 @@ def _run_asyncio_task(
|
|||
|
||||
# is trio the src of the aio task's exc-as-outcome?
|
||||
trio_err: BaseException|None = chan._trio_err
|
||||
curr_aio_err: BaseException|None = chan._aio_err
|
||||
if (
|
||||
curr_aio_err
|
||||
or
|
||||
|
@ -648,6 +660,8 @@ def _run_asyncio_task(
|
|||
trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise
|
||||
aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise
|
||||
if (
|
||||
not chan._aio_result
|
||||
and
|
||||
not trio_cs.cancelled_caught
|
||||
and (
|
||||
(aio_err and type(aio_err) not in {
|
||||
|
@ -680,7 +694,17 @@ def _run_asyncio_task(
|
|||
# match the one we just caught from the task above!
|
||||
# (that would indicate something weird/very-wrong
|
||||
# going on?)
|
||||
if aio_err is not trio_to_raise:
|
||||
if (
|
||||
aio_err is not trio_to_raise
|
||||
and (
|
||||
not suppress_graceful_exits
|
||||
and (
|
||||
chan._aio_result is not Unresolved
|
||||
and
|
||||
isinstance(trio_to_raise, AsyncioTaskExited)
|
||||
)
|
||||
)
|
||||
):
|
||||
# raise aio_err from relayed_aio_err
|
||||
raise trio_to_raise from curr_aio_err
|
||||
|
||||
|
@ -715,6 +739,7 @@ async def translate_aio_errors(
|
|||
trio_task = trio.lowlevel.current_task()
|
||||
aio_err: BaseException|None = chan._aio_err
|
||||
aio_task: asyncio.Task = chan._aio_task
|
||||
aio_done_before_trio: bool = aio_task.done()
|
||||
assert aio_task
|
||||
trio_err: BaseException|None = None
|
||||
to_raise_trio: BaseException|None = None
|
||||
|
@ -1011,6 +1036,8 @@ async def translate_aio_errors(
|
|||
# -[ ] make this a channel method, OR
|
||||
# -[ ] just put back inline below?
|
||||
#
|
||||
# await tractor.pause(shield=True)
|
||||
# TODO, go back to inlining this..
|
||||
def maybe_raise_aio_side_err(
|
||||
trio_err: Exception,
|
||||
) -> None:
|
||||
|
@ -1028,11 +1055,10 @@ async def translate_aio_errors(
|
|||
None
|
||||
) = chan._trio_to_raise
|
||||
|
||||
if (
|
||||
trio_to_raise
|
||||
and
|
||||
suppress_graceful_exits
|
||||
):
|
||||
if not suppress_graceful_exits:
|
||||
raise trio_to_raise from (aio_err or trio_err)
|
||||
|
||||
if trio_to_raise:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
match (
|
||||
trio_to_raise,
|
||||
|
@ -1040,7 +1066,7 @@ async def translate_aio_errors(
|
|||
):
|
||||
case (
|
||||
AsyncioTaskExited(),
|
||||
None|trio.Cancelled(),
|
||||
trio.Cancelled()|None,
|
||||
):
|
||||
log.info(
|
||||
'Ignoring aio exit signal since trio also exited!'
|
||||
|
@ -1051,12 +1077,13 @@ async def translate_aio_errors(
|
|||
AsyncioCancelled(),
|
||||
trio.Cancelled(),
|
||||
):
|
||||
log.info(
|
||||
'Ignoring aio cancelled signal since trio was also cancelled!'
|
||||
)
|
||||
return
|
||||
|
||||
raise trio_to_raise from (aio_err or trio_err)
|
||||
if not aio_done_before_trio:
|
||||
log.info(
|
||||
'Ignoring aio cancelled signal since trio was also cancelled!'
|
||||
)
|
||||
return
|
||||
case _:
|
||||
raise trio_to_raise from (aio_err or trio_err)
|
||||
|
||||
# Check if the asyncio-side is the cause of the trio-side
|
||||
# error.
|
||||
|
@ -1128,6 +1155,7 @@ async def run_task(
|
|||
async with translate_aio_errors(
|
||||
chan,
|
||||
wait_on_aio_task=True,
|
||||
suppress_graceful_exits=chan._suppress_graceful_exits,
|
||||
):
|
||||
# return single value that is the output from the
|
||||
# ``asyncio`` function-as-task. Expect the mem chan api
|
||||
|
@ -1143,7 +1171,8 @@ async def run_task(
|
|||
async def open_channel_from(
|
||||
|
||||
target: Callable[..., Any],
|
||||
**kwargs,
|
||||
suppress_graceful_exits: bool = True,
|
||||
**target_kwargs,
|
||||
|
||||
) -> AsyncIterator[Any]:
|
||||
'''
|
||||
|
@ -1155,13 +1184,15 @@ async def open_channel_from(
|
|||
target,
|
||||
qsize=2**8,
|
||||
provide_channels=True,
|
||||
**kwargs,
|
||||
suppress_graceful_exits=suppress_graceful_exits,
|
||||
**target_kwargs,
|
||||
)
|
||||
# TODO, tuple form here?
|
||||
async with chan._from_aio:
|
||||
async with translate_aio_errors(
|
||||
chan,
|
||||
wait_on_aio_task=True,
|
||||
suppress_graceful_exits=suppress_graceful_exits,
|
||||
):
|
||||
# sync to a "started()"-like first delivered value from the
|
||||
# ``asyncio`` task.
|
||||
|
|
Loading…
Reference in New Issue