Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 7fac170f8d Add equiv of `AsyncioCancelled` for aio side
Such that a `TrioCancelled` is raised in the aio task via
`.set_exception()` to explicitly indicate and allow that task to handle
a taskc request from the parent `trio.Task`.
2025-03-03 12:13:25 -05:00
Tyler Goodlet 44281268a8 More `debug_mode` test support, better nursery var names 2025-03-03 12:11:50 -05:00
Tyler Goodlet 064c5ce034 Add per-side graceful-exit/cancel excs-as-signals
Such that any combination of task terminations/exits can be explicitly
handled and "dual side independent" crash cases re-raised in egs.

The main error-or-exit impl changes include,

- use of new per-side "signaling exceptions":
  - TrioTaskExited|TrioCancelled for signalling aio.
  - AsyncioTaskExited|AsyncioCancelled for signalling trio.

- NOT overloading the `LinkedTaskChannel._trio/aio_err` fields for
  err-as-signal relay and instead add a new pair of
  `._trio/aio_to_raise` maybe-exc-attrs which allow each side's
  task to specify what it would want the other side to raise to signal
  its/a termination outcome:
  - `._trio_to_raise: AsyncioTaskExited|AsyncioCancelled` to signal,
    |_ the aio task having returned while the trio side was still reading
       from the `asyncio.Queue` or is just not `.done()`.
    |_ the aio task being self or trio-request cancelled where
       a `asyncio.CancelledError` is raised and caught but NOT relayed
       as is back to trio; instead signal a "more explicit" exc type.
  - `._aio_to_raise: TrioTaskExited|TrioCancelled` to signal,
    |_ the trio task having returned while the aio side was still reading
       from the mem chan and indicating that the trio side might not
       care any more about future streamed values (like the
       `Stop/EndOfChannel` equivs for ipc `Context`s).
    |_ when the trio task canceld we do
        a `asyncio.Future.set_exception(TrioTaskExited())` to indicate
        to the aio side verbosely that it should cancel due to the trio
        parent.
  - `_aio/trio_err` are now left to only capturing the **actual**
    per-side task excs for introspection / other side's handling logic.

- supporting "graceful exits" depending on API in use from
  `translate_aio_errors()` such that if either side exits but the other
  side isn't expect to consume the final `return`ed value, we just exit
  silently, which required:
  - adding a `suppress_graceful_exits: bool` flag.
  - adjusting the `maybe_raise_aio_side_err()` logic to use that flag
    and suppress only on certain combos of `._trio_to_raise/._trio_err`.
  - prefer to raise `._trio_to_raise` when the aio-side is the src and
    vice versa.

- filling out pedantic logging for cancellation cases indicating which
  side is the cause.

- add a `LinkedTaskChannel._aio_result` modelled after our
  `Context._result` a a similar `.wait_for_result()` interface which
  allows maybe accessing the aio task's final return value if desired
  when using the `open_channel_from()` API.

- rename `cancel_trio()` done handler -> `signal_trio_when_done()`

Also some fairly major test suite updates,
- add a `delay: int` producing fixture which delivers a much larger
  timeout whenever `debug_mode` is set so that the REPL can be used
  without a surrounding cancel firing.
- add a new `test_aio_exits_early_relays_AsyncioTaskExited` including
  a paired `exit_early: bool` flag to `push_from_aio_task()`.
- adjust `test_trio_closes_early_causes_aio_checkpoint_raise` to expect
  a `to_asyncio.TrioTaskExited`.
2025-03-03 12:00:14 -05:00
Tyler Goodlet 6637473b54 Expose `._state.debug_mode()` predicate at top level 2025-03-03 11:16:47 -05:00
4 changed files with 179 additions and 81 deletions

View File

@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
@pytest.fixture(
scope='module',
)
def delay(debug_mode: bool) -> int:
if debug_mode:
return 999
else:
return 1
async def sleep_and_err( async def sleep_and_err(
sleep_for: float = 0.1, sleep_for: float = 0.1,
@ -59,19 +69,23 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(aio_sleep_forever) await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side(reg_addr): def test_trio_cancels_aio_on_actor_side(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
''' '''
Spawn an infected actor that is cancelled by the ``trio`` side Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis. task using std cancel scope apis.
''' '''
async def main(): async def main():
# with trio.fail_after(1): with trio.fail_after(1 + delay):
with trio.fail_after(999):
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr] registry_addrs=[reg_addr],
) as n: debug_mode=debug_mode,
await n.run_in_actor( ) as an:
await an.run_in_actor(
trio_cancels_single_aio_task, trio_cancels_single_aio_task,
infect_asyncio=True, infect_asyncio=True,
) )
@ -118,7 +132,10 @@ async def asyncio_actor(
raise raise
def test_aio_simple_error(reg_addr): def test_aio_simple_error(
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify a simple remote asyncio error propagates back through trio Verify a simple remote asyncio error propagates back through trio
to the parent actor. to the parent actor.
@ -127,9 +144,10 @@ def test_aio_simple_error(reg_addr):
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr] registry_addrs=[reg_addr],
) as n: debug_mode=debug_mode,
await n.run_in_actor( ) as an:
await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_and_err', target='sleep_and_err',
expect_err='AssertionError', expect_err='AssertionError',
@ -155,14 +173,19 @@ def test_aio_simple_error(reg_addr):
assert err.boxed_type is AssertionError assert err.boxed_type is AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify we can cancel a spawned asyncio task gracefully. Verify we can cancel a spawned asyncio task gracefully.
''' '''
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.run_in_actor( debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='aio_sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
@ -174,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
trio.run(main) trio.run(main)
def test_trio_cancels_aio(reg_addr): def test_trio_cancels_aio(
reg_addr: tuple[str, int],
):
''' '''
Much like the above test with ``tractor.Portal.cancel_actor()`` Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api. except we just use a standard ``trio`` cancellation api.
@ -205,7 +230,8 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
# message. # message.
with trio.fail_after(2): delay: int = 999 if tractor.debug_mode() else 1
with trio.fail_after(1 + delay):
try: try:
async with ( async with (
trio.open_nursery( trio.open_nursery(
@ -241,8 +267,10 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format ids='parent_actor_cancels_child={}'.format
) )
def test_context_spawns_aio_task_that_errors( def test_context_spawns_aio_task_that_errors(
reg_addr, reg_addr: tuple[str, int],
delay: int,
parent_cancels: bool, parent_cancels: bool,
debug_mode: bool,
): ):
''' '''
Verify that spawning a task via an intertask channel ctx mngr that Verify that spawning a task via an intertask channel ctx mngr that
@ -251,14 +279,13 @@ def test_context_spawns_aio_task_that_errors(
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(1 + delay):
# with trio.fail_after(999): async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n: p = await an.start_actor(
p = await n.start_actor(
'aio_daemon', 'aio_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
# debug_mode=True, debug_mode=debug_mode,
loglevel='cancel', loglevel='cancel',
) )
async with ( async with (
@ -325,11 +352,12 @@ async def aio_cancel():
def test_aio_cancelled_from_aio_causes_trio_cancelled( def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple, reg_addr: tuple,
delay: int,
): ):
''' '''
When the `asyncio.Task` cancels itself the `trio` side cshould When the `asyncio.Task` cancels itself the `trio` side should
also cancel and teardown and relay the cancellation cross-process also cancel and teardown and relay the cancellation cross-process
to the caller (parent). to the parent caller.
''' '''
async def main(): async def main():
@ -345,8 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the # NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here # portal's result but we do it explicitly here
# to avoid indent levels. # to avoid indent levels.
# with trio.fail_after(1): with trio.fail_after(1 + delay):
with trio.fail_after(999):
await p.wait_for_result() await p.wait_for_result()
with pytest.raises( with pytest.raises(
@ -397,6 +424,7 @@ async def push_from_aio_task(
i == 50 i == 50
): ):
if fail_early: if fail_early:
print('Raising exc from aio side!')
raise Exception raise Exception
if exit_early: if exit_early:
@ -425,7 +453,7 @@ async def push_from_aio_task(
async def stream_from_aio( async def stream_from_aio(
trio_exit_early: bool = False, trio_exit_early: bool = False,
raise_err: bool = False, trio_raise_err: bool = False,
aio_raise_err: bool = False, aio_raise_err: bool = False,
aio_exit_early: bool = False, aio_exit_early: bool = False,
fan_out: bool = False, fan_out: bool = False,
@ -440,10 +468,18 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
push_from_aio_task, push_from_aio_task,
sequence=seq, sequence=seq,
expect_cancel=raise_err or trio_exit_early, expect_cancel=trio_raise_err or trio_exit_early,
fail_early=aio_raise_err, fail_early=aio_raise_err,
exit_early=aio_exit_early, exit_early=aio_exit_early,
# such that we can test exit early cases
# for each side explicitly.
suppress_graceful_exits=(not(
aio_exit_early
or
trio_exit_early
))
) as (first, chan): ) as (first, chan):
assert first is True assert first is True
@ -459,7 +495,7 @@ async def stream_from_aio(
pulled.append(value) pulled.append(value)
if value == 50: if value == 50:
if raise_err: if trio_raise_err:
raise Exception raise Exception
elif trio_exit_early: elif trio_exit_early:
print('`consume()` breaking early!\n') print('`consume()` breaking early!\n')
@ -478,11 +514,11 @@ async def stream_from_aio(
# tasks are joined.. # tasks are joined..
chan.subscribe() as br, chan.subscribe() as br,
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
# start 2nd task that get's broadcast the same # start 2nd task that get's broadcast the same
# value set. # value set.
n.start_soon(consume, br) tn.start_soon(consume, br)
await consume(chan) await consume(chan)
else: else:
@ -495,14 +531,14 @@ async def stream_from_aio(
finally: finally:
if ( if not (
not raise_err trio_raise_err
and or
not trio_exit_early trio_exit_early
and or
not aio_raise_err aio_raise_err
and or
not aio_exit_early aio_exit_early
): ):
if fan_out: if fan_out:
# we get double the pulled values in the # we get double the pulled values in the
@ -526,10 +562,13 @@ async def stream_from_aio(
'fan_out', [False, True], 'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format ids='fan_out_w_chan_subscribe={}'.format
) )
def test_basic_interloop_channel_stream(reg_addr, fan_out): def test_basic_interloop_channel_stream(
reg_addr: tuple[str, int],
fan_out: bool,
):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
infect_asyncio=True, infect_asyncio=True,
fan_out=fan_out, fan_out=fan_out,
@ -543,10 +582,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here? # TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr): def test_trio_error_cancels_intertask_chan(reg_addr):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
raise_err=True, trio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger remote actor error # should trigger remote actor error
@ -561,6 +600,8 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
def test_trio_closes_early_causes_aio_checkpoint_raise( def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
): ):
''' '''
Check that if the `trio`-task "exits early and silently" (in this Check that if the `trio`-task "exits early and silently" (in this
@ -572,13 +613,12 @@ def test_trio_closes_early_causes_aio_checkpoint_raise(
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(1 + delay):
# with trio.fail_after(999):
async with tractor.open_nursery( async with tractor.open_nursery(
# debug_mode=True, debug_mode=debug_mode,
# enable_stack_on_sig=True, # enable_stack_on_sig=True,
) as n: ) as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
trio_exit_early=True, trio_exit_early=True,
infect_asyncio=True, infect_asyncio=True,
@ -606,6 +646,8 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
# - trio cancelled AND aio exits early on its next tick # - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick # - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
debug_mode: bool,
delay: int,
): ):
''' '''
Check that if the `asyncio`-task "exits early and silently" (in this Check that if the `asyncio`-task "exits early and silently" (in this
@ -621,10 +663,9 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(1 + delay):
# with trio.fail_after(999):
async with tractor.open_nursery( async with tractor.open_nursery(
# debug_mode=True, debug_mode=debug_mode,
# enable_stack_on_sig=True, # enable_stack_on_sig=True,
) as an: ) as an:
portal = await an.run_in_actor( portal = await an.run_in_actor(
@ -658,10 +699,15 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
assert exc.boxed_type is to_asyncio.AsyncioTaskExited assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(reg_addr): def test_aio_errors_and_channel_propagates_and_closes(
reg_addr: tuple[str, int],
debug_mode: bool,
):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.run_in_actor( debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
aio_raise_err=True, aio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
@ -736,13 +782,15 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format, ids='raise_error={}'.format,
) )
def test_echoserver_detailed_mechanics( def test_echoserver_detailed_mechanics(
reg_addr, reg_addr: tuple[str, int],
debug_mode: bool,
raise_error_mid_stream, raise_error_mid_stream,
): ):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
p = await n.start_actor( debug_mode=debug_mode,
) as an:
p = await an.start_actor(
'aio_server', 'aio_server',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
@ -947,6 +995,8 @@ def test_sigint_closes_lifetime_stack(
''' '''
async def main(): async def main():
delay = 999 if tractor.debug_mode() else 1
try: try:
an: tractor.ActorNursery an: tractor.ActorNursery
async with tractor.open_nursery( async with tractor.open_nursery(
@ -997,7 +1047,7 @@ def test_sigint_closes_lifetime_stack(
if wait_for_ctx: if wait_for_ctx:
print('waiting for ctx outcome in parent..') print('waiting for ctx outcome in parent..')
try: try:
with trio.fail_after(1): with trio.fail_after(1 + delay):
await ctx.wait_for_result() await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid assert ctxc.canceller == ctx.chan.uid

View File

@ -43,6 +43,7 @@ from ._state import (
current_actor as current_actor, current_actor as current_actor,
is_root_process as is_root_process, is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx, current_ipc_ctx as current_ipc_ctx,
debug_mode as debug_mode
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled as ContextCancelled, ContextCancelled as ContextCancelled,
@ -65,3 +66,4 @@ from ._root import (
from ._ipc import Channel as Channel from ._ipc import Channel as Channel
from ._portal import Portal as Portal from ._portal import Portal as Portal
from ._runtime import Actor as Actor from ._runtime import Actor as Actor
from . import hilevel as hilevel

View File

@ -103,7 +103,16 @@ class AsyncioTaskExited(Exception):
''' '''
class TrioTaskExited(AsyncioCancelled): class TrioCancelled(Exception):
'''
Trio cancelled translation (non-base) error
for use with the `to_asyncio` module
to be raised in the `asyncio.Task` to indicate
that the `trio` side raised `Cancelled` or an error.
'''
class TrioTaskExited(Exception):
''' '''
The `trio`-side task exited without explicitly cancelling the The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer. `asyncio.Task` peer.
@ -406,6 +415,9 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type. String-name of the (last hop's) boxed error type.
''' '''
# TODO, maybe support also serializing the
# `ExceptionGroup.exeptions: list[BaseException]` set under
# certain conditions?
bt: Type[BaseException] = self.boxed_type bt: Type[BaseException] = self.boxed_type
if bt: if bt:
return str(bt.__name__) return str(bt.__name__)
@ -821,8 +833,11 @@ class MsgTypeError(
''' '''
if ( if (
(_bad_msg := self.msgdata.get('_bad_msg')) (_bad_msg := self.msgdata.get('_bad_msg'))
and and (
isinstance(_bad_msg, PayloadMsg) isinstance(_bad_msg, PayloadMsg)
or
isinstance(_bad_msg, msgtypes.Start)
)
): ):
return _bad_msg return _bad_msg

View File

@ -116,6 +116,8 @@ class LinkedTaskChannel(
_trio_task: trio.Task _trio_task: trio.Task
_aio_task_complete: trio.Event _aio_task_complete: trio.Event
_suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None _trio_err: BaseException|None = None
_trio_to_raise: ( _trio_to_raise: (
AsyncioTaskExited| # aio task exits while trio ongoing AsyncioTaskExited| # aio task exits while trio ongoing
@ -229,6 +231,8 @@ class LinkedTaskChannel(
except BaseException as err: except BaseException as err:
async with translate_aio_errors( async with translate_aio_errors(
chan=self, chan=self,
# NOTE, determined by `open_channel_from()` input arg
suppress_graceful_exits=self._suppress_graceful_exits,
# XXX: obviously this will deadlock if an on-going stream is # XXX: obviously this will deadlock if an on-going stream is
# being procesed. # being procesed.
@ -300,6 +304,7 @@ def _run_asyncio_task(
*, *,
qsize: int = 1, qsize: int = 1,
provide_channels: bool = False, provide_channels: bool = False,
suppress_graceful_exits: bool = True,
hide_tb: bool = False, hide_tb: bool = False,
**kwargs, **kwargs,
@ -352,6 +357,7 @@ def _run_asyncio_task(
_trio_cs=trio_cs, _trio_cs=trio_cs,
_trio_task=trio_task, _trio_task=trio_task,
_aio_task_complete=aio_task_complete, _aio_task_complete=aio_task_complete,
_suppress_graceful_exits=suppress_graceful_exits,
) )
async def wait_on_coro_final_result( async def wait_on_coro_final_result(
@ -365,7 +371,6 @@ def _run_asyncio_task(
`return`-ed result back to `trio`. `return`-ed result back to `trio`.
''' '''
nonlocal aio_err
nonlocal chan nonlocal chan
orig = result = id(coro) orig = result = id(coro)
@ -383,7 +388,6 @@ def _run_asyncio_task(
'`asyncio` task errored\n' '`asyncio` task errored\n'
) )
raise raise
else: else:
if ( if (
result != orig result != orig
@ -419,7 +423,11 @@ def _run_asyncio_task(
# - raise error and aio side errors "independently" # - raise error and aio side errors "independently"
# on next tick (SEE draft HANDLER BELOW). # on next tick (SEE draft HANDLER BELOW).
stats: trio.MemoryChannelStatistics = to_trio.statistics() stats: trio.MemoryChannelStatistics = to_trio.statistics()
if stats.tasks_waiting_receive: if (
stats.tasks_waiting_receive
and
not chan._aio_err
):
chan._trio_to_raise = AsyncioTaskExited( chan._trio_to_raise = AsyncioTaskExited(
f'Task existed with final result\n' f'Task existed with final result\n'
f'{result!r}\n' f'{result!r}\n'
@ -513,6 +521,9 @@ def _run_asyncio_task(
f')> {res}\n' f')> {res}\n'
f' |_{task}\n' f' |_{task}\n'
) )
if not chan._aio_result:
chan._aio_result = res
# ?TODO, should we also raise `AsyncioTaskExited[res]` # ?TODO, should we also raise `AsyncioTaskExited[res]`
# in any case where trio is NOT blocking on the # in any case where trio is NOT blocking on the
# `._to_trio` chan? # `._to_trio` chan?
@ -593,6 +604,7 @@ def _run_asyncio_task(
# is trio the src of the aio task's exc-as-outcome? # is trio the src of the aio task's exc-as-outcome?
trio_err: BaseException|None = chan._trio_err trio_err: BaseException|None = chan._trio_err
curr_aio_err: BaseException|None = chan._aio_err
if ( if (
curr_aio_err curr_aio_err
or or
@ -648,6 +660,8 @@ def _run_asyncio_task(
trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise
aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise
if ( if (
not chan._aio_result
and
not trio_cs.cancelled_caught not trio_cs.cancelled_caught
and ( and (
(aio_err and type(aio_err) not in { (aio_err and type(aio_err) not in {
@ -680,7 +694,17 @@ def _run_asyncio_task(
# match the one we just caught from the task above! # match the one we just caught from the task above!
# (that would indicate something weird/very-wrong # (that would indicate something weird/very-wrong
# going on?) # going on?)
if aio_err is not trio_to_raise: if (
aio_err is not trio_to_raise
and (
not suppress_graceful_exits
and (
chan._aio_result is not Unresolved
and
isinstance(trio_to_raise, AsyncioTaskExited)
)
)
):
# raise aio_err from relayed_aio_err # raise aio_err from relayed_aio_err
raise trio_to_raise from curr_aio_err raise trio_to_raise from curr_aio_err
@ -715,6 +739,7 @@ async def translate_aio_errors(
trio_task = trio.lowlevel.current_task() trio_task = trio.lowlevel.current_task()
aio_err: BaseException|None = chan._aio_err aio_err: BaseException|None = chan._aio_err
aio_task: asyncio.Task = chan._aio_task aio_task: asyncio.Task = chan._aio_task
aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
to_raise_trio: BaseException|None = None to_raise_trio: BaseException|None = None
@ -1011,6 +1036,8 @@ async def translate_aio_errors(
# -[ ] make this a channel method, OR # -[ ] make this a channel method, OR
# -[ ] just put back inline below? # -[ ] just put back inline below?
# #
# await tractor.pause(shield=True)
# TODO, go back to inlining this..
def maybe_raise_aio_side_err( def maybe_raise_aio_side_err(
trio_err: Exception, trio_err: Exception,
) -> None: ) -> None:
@ -1028,11 +1055,10 @@ async def translate_aio_errors(
None None
) = chan._trio_to_raise ) = chan._trio_to_raise
if ( if not suppress_graceful_exits:
trio_to_raise raise trio_to_raise from (aio_err or trio_err)
and
suppress_graceful_exits if trio_to_raise:
):
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
match ( match (
trio_to_raise, trio_to_raise,
@ -1040,7 +1066,7 @@ async def translate_aio_errors(
): ):
case ( case (
AsyncioTaskExited(), AsyncioTaskExited(),
None|trio.Cancelled(), trio.Cancelled()|None,
): ):
log.info( log.info(
'Ignoring aio exit signal since trio also exited!' 'Ignoring aio exit signal since trio also exited!'
@ -1051,12 +1077,13 @@ async def translate_aio_errors(
AsyncioCancelled(), AsyncioCancelled(),
trio.Cancelled(), trio.Cancelled(),
): ):
log.info( if not aio_done_before_trio:
'Ignoring aio cancelled signal since trio was also cancelled!' log.info(
) 'Ignoring aio cancelled signal since trio was also cancelled!'
return )
return
raise trio_to_raise from (aio_err or trio_err) case _:
raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side # Check if the asyncio-side is the cause of the trio-side
# error. # error.
@ -1128,6 +1155,7 @@ async def run_task(
async with translate_aio_errors( async with translate_aio_errors(
chan, chan,
wait_on_aio_task=True, wait_on_aio_task=True,
suppress_graceful_exits=chan._suppress_graceful_exits,
): ):
# return single value that is the output from the # return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api # ``asyncio`` function-as-task. Expect the mem chan api
@ -1143,7 +1171,8 @@ async def run_task(
async def open_channel_from( async def open_channel_from(
target: Callable[..., Any], target: Callable[..., Any],
**kwargs, suppress_graceful_exits: bool = True,
**target_kwargs,
) -> AsyncIterator[Any]: ) -> AsyncIterator[Any]:
''' '''
@ -1155,13 +1184,15 @@ async def open_channel_from(
target, target,
qsize=2**8, qsize=2**8,
provide_channels=True, provide_channels=True,
**kwargs, suppress_graceful_exits=suppress_graceful_exits,
**target_kwargs,
) )
# TODO, tuple form here? # TODO, tuple form here?
async with chan._from_aio: async with chan._from_aio:
async with translate_aio_errors( async with translate_aio_errors(
chan, chan,
wait_on_aio_task=True, wait_on_aio_task=True,
suppress_graceful_exits=suppress_graceful_exits,
): ):
# sync to a "started()"-like first delivered value from the # sync to a "started()"-like first delivered value from the
# ``asyncio`` task. # ``asyncio`` task.