Compare commits
4 Commits
4c82b6e94f
...
7fac170f8d
Author | SHA1 | Date |
---|---|---|
|
7fac170f8d | |
|
44281268a8 | |
|
064c5ce034 | |
|
6637473b54 |
|
@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
|
||||||
from tractor._testing import expect_ctxc
|
from tractor._testing import expect_ctxc
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope='module',
|
||||||
|
)
|
||||||
|
def delay(debug_mode: bool) -> int:
|
||||||
|
if debug_mode:
|
||||||
|
return 999
|
||||||
|
else:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
async def sleep_and_err(
|
async def sleep_and_err(
|
||||||
sleep_for: float = 0.1,
|
sleep_for: float = 0.1,
|
||||||
|
|
||||||
|
@ -59,19 +69,23 @@ async def trio_cancels_single_aio_task():
|
||||||
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
def test_trio_cancels_aio_on_actor_side(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Spawn an infected actor that is cancelled by the ``trio`` side
|
Spawn an infected actor that is cancelled by the ``trio`` side
|
||||||
task using std cancel scope apis.
|
task using std cancel scope apis.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
# with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
with trio.fail_after(999):
|
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr]
|
registry_addrs=[reg_addr],
|
||||||
) as n:
|
debug_mode=debug_mode,
|
||||||
await n.run_in_actor(
|
) as an:
|
||||||
|
await an.run_in_actor(
|
||||||
trio_cancels_single_aio_task,
|
trio_cancels_single_aio_task,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
@ -118,7 +132,10 @@ async def asyncio_actor(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def test_aio_simple_error(reg_addr):
|
def test_aio_simple_error(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify a simple remote asyncio error propagates back through trio
|
Verify a simple remote asyncio error propagates back through trio
|
||||||
to the parent actor.
|
to the parent actor.
|
||||||
|
@ -127,9 +144,10 @@ def test_aio_simple_error(reg_addr):
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr]
|
registry_addrs=[reg_addr],
|
||||||
) as n:
|
debug_mode=debug_mode,
|
||||||
await n.run_in_actor(
|
) as an:
|
||||||
|
await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='sleep_and_err',
|
target='sleep_and_err',
|
||||||
expect_err='AssertionError',
|
expect_err='AssertionError',
|
||||||
|
@ -155,14 +173,19 @@ def test_aio_simple_error(reg_addr):
|
||||||
assert err.boxed_type is AssertionError
|
assert err.boxed_type is AssertionError
|
||||||
|
|
||||||
|
|
||||||
def test_tractor_cancels_aio(reg_addr):
|
def test_tractor_cancels_aio(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify we can cancel a spawned asyncio task gracefully.
|
Verify we can cancel a spawned asyncio task gracefully.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
portal = await n.run_in_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='aio_sleep_forever',
|
target='aio_sleep_forever',
|
||||||
expect_err='trio.Cancelled',
|
expect_err='trio.Cancelled',
|
||||||
|
@ -174,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio(reg_addr):
|
def test_trio_cancels_aio(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Much like the above test with ``tractor.Portal.cancel_actor()``
|
Much like the above test with ``tractor.Portal.cancel_actor()``
|
||||||
except we just use a standard ``trio`` cancellation api.
|
except we just use a standard ``trio`` cancellation api.
|
||||||
|
@ -205,7 +230,8 @@ async def trio_ctx(
|
||||||
|
|
||||||
# this will block until the ``asyncio`` task sends a "first"
|
# this will block until the ``asyncio`` task sends a "first"
|
||||||
# message.
|
# message.
|
||||||
with trio.fail_after(2):
|
delay: int = 999 if tractor.debug_mode() else 1
|
||||||
|
with trio.fail_after(1 + delay):
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
trio.open_nursery(
|
||||||
|
@ -241,8 +267,10 @@ async def trio_ctx(
|
||||||
ids='parent_actor_cancels_child={}'.format
|
ids='parent_actor_cancels_child={}'.format
|
||||||
)
|
)
|
||||||
def test_context_spawns_aio_task_that_errors(
|
def test_context_spawns_aio_task_that_errors(
|
||||||
reg_addr,
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
parent_cancels: bool,
|
parent_cancels: bool,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify that spawning a task via an intertask channel ctx mngr that
|
Verify that spawning a task via an intertask channel ctx mngr that
|
||||||
|
@ -251,14 +279,13 @@ def test_context_spawns_aio_task_that_errors(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
# with trio.fail_after(999):
|
async with tractor.open_nursery() as an:
|
||||||
async with tractor.open_nursery() as n:
|
p = await an.start_actor(
|
||||||
p = await n.start_actor(
|
|
||||||
'aio_daemon',
|
'aio_daemon',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
loglevel='cancel',
|
loglevel='cancel',
|
||||||
)
|
)
|
||||||
async with (
|
async with (
|
||||||
|
@ -325,11 +352,12 @@ async def aio_cancel():
|
||||||
|
|
||||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
reg_addr: tuple,
|
reg_addr: tuple,
|
||||||
|
delay: int,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
When the `asyncio.Task` cancels itself the `trio` side cshould
|
When the `asyncio.Task` cancels itself the `trio` side should
|
||||||
also cancel and teardown and relay the cancellation cross-process
|
also cancel and teardown and relay the cancellation cross-process
|
||||||
to the caller (parent).
|
to the parent caller.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -345,8 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# NOTE: normally the `an.__aexit__()` waits on the
|
# NOTE: normally the `an.__aexit__()` waits on the
|
||||||
# portal's result but we do it explicitly here
|
# portal's result but we do it explicitly here
|
||||||
# to avoid indent levels.
|
# to avoid indent levels.
|
||||||
# with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
with trio.fail_after(999):
|
|
||||||
await p.wait_for_result()
|
await p.wait_for_result()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
|
@ -397,6 +424,7 @@ async def push_from_aio_task(
|
||||||
i == 50
|
i == 50
|
||||||
):
|
):
|
||||||
if fail_early:
|
if fail_early:
|
||||||
|
print('Raising exc from aio side!')
|
||||||
raise Exception
|
raise Exception
|
||||||
|
|
||||||
if exit_early:
|
if exit_early:
|
||||||
|
@ -425,7 +453,7 @@ async def push_from_aio_task(
|
||||||
|
|
||||||
async def stream_from_aio(
|
async def stream_from_aio(
|
||||||
trio_exit_early: bool = False,
|
trio_exit_early: bool = False,
|
||||||
raise_err: bool = False,
|
trio_raise_err: bool = False,
|
||||||
aio_raise_err: bool = False,
|
aio_raise_err: bool = False,
|
||||||
aio_exit_early: bool = False,
|
aio_exit_early: bool = False,
|
||||||
fan_out: bool = False,
|
fan_out: bool = False,
|
||||||
|
@ -440,10 +468,18 @@ async def stream_from_aio(
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
push_from_aio_task,
|
push_from_aio_task,
|
||||||
sequence=seq,
|
sequence=seq,
|
||||||
expect_cancel=raise_err or trio_exit_early,
|
expect_cancel=trio_raise_err or trio_exit_early,
|
||||||
fail_early=aio_raise_err,
|
fail_early=aio_raise_err,
|
||||||
exit_early=aio_exit_early,
|
exit_early=aio_exit_early,
|
||||||
|
|
||||||
|
# such that we can test exit early cases
|
||||||
|
# for each side explicitly.
|
||||||
|
suppress_graceful_exits=(not(
|
||||||
|
aio_exit_early
|
||||||
|
or
|
||||||
|
trio_exit_early
|
||||||
|
))
|
||||||
|
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
|
|
||||||
assert first is True
|
assert first is True
|
||||||
|
@ -459,7 +495,7 @@ async def stream_from_aio(
|
||||||
pulled.append(value)
|
pulled.append(value)
|
||||||
|
|
||||||
if value == 50:
|
if value == 50:
|
||||||
if raise_err:
|
if trio_raise_err:
|
||||||
raise Exception
|
raise Exception
|
||||||
elif trio_exit_early:
|
elif trio_exit_early:
|
||||||
print('`consume()` breaking early!\n')
|
print('`consume()` breaking early!\n')
|
||||||
|
@ -478,11 +514,11 @@ async def stream_from_aio(
|
||||||
# tasks are joined..
|
# tasks are joined..
|
||||||
chan.subscribe() as br,
|
chan.subscribe() as br,
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
# start 2nd task that get's broadcast the same
|
# start 2nd task that get's broadcast the same
|
||||||
# value set.
|
# value set.
|
||||||
n.start_soon(consume, br)
|
tn.start_soon(consume, br)
|
||||||
await consume(chan)
|
await consume(chan)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -495,14 +531,14 @@ async def stream_from_aio(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
if (
|
if not (
|
||||||
not raise_err
|
trio_raise_err
|
||||||
and
|
or
|
||||||
not trio_exit_early
|
trio_exit_early
|
||||||
and
|
or
|
||||||
not aio_raise_err
|
aio_raise_err
|
||||||
and
|
or
|
||||||
not aio_exit_early
|
aio_exit_early
|
||||||
):
|
):
|
||||||
if fan_out:
|
if fan_out:
|
||||||
# we get double the pulled values in the
|
# we get double the pulled values in the
|
||||||
|
@ -526,10 +562,13 @@ async def stream_from_aio(
|
||||||
'fan_out', [False, True],
|
'fan_out', [False, True],
|
||||||
ids='fan_out_w_chan_subscribe={}'.format
|
ids='fan_out_w_chan_subscribe={}'.format
|
||||||
)
|
)
|
||||||
def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
def test_basic_interloop_channel_stream(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
fan_out: bool,
|
||||||
|
):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
fan_out=fan_out,
|
fan_out=fan_out,
|
||||||
|
@ -543,10 +582,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||||
# TODO: parametrize the above test and avoid the duplication here?
|
# TODO: parametrize the above test and avoid the duplication here?
|
||||||
def test_trio_error_cancels_intertask_chan(reg_addr):
|
def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
raise_err=True,
|
trio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
|
@ -561,6 +600,8 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
|
|
||||||
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Check that if the `trio`-task "exits early and silently" (in this
|
Check that if the `trio`-task "exits early and silently" (in this
|
||||||
|
@ -572,13 +613,12 @@ def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
# with trio.fail_after(999):
|
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
# enable_stack_on_sig=True,
|
# enable_stack_on_sig=True,
|
||||||
) as n:
|
) as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
trio_exit_early=True,
|
trio_exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -606,6 +646,8 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||||
# - trio cancelled AND aio exits early on its next tick
|
# - trio cancelled AND aio exits early on its next tick
|
||||||
# - trio errors AND aio exits early on its next tick
|
# - trio errors AND aio exits early on its next tick
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
delay: int,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Check that if the `asyncio`-task "exits early and silently" (in this
|
Check that if the `asyncio`-task "exits early and silently" (in this
|
||||||
|
@ -621,10 +663,9 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
# with trio.fail_after(999):
|
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
# enable_stack_on_sig=True,
|
# enable_stack_on_sig=True,
|
||||||
) as an:
|
) as an:
|
||||||
portal = await an.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
|
@ -658,10 +699,15 @@ def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||||
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
||||||
|
|
||||||
|
|
||||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
def test_aio_errors_and_channel_propagates_and_closes(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
portal = await n.run_in_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
aio_raise_err=True,
|
aio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -736,13 +782,15 @@ async def trio_to_aio_echo_server(
|
||||||
ids='raise_error={}'.format,
|
ids='raise_error={}'.format,
|
||||||
)
|
)
|
||||||
def test_echoserver_detailed_mechanics(
|
def test_echoserver_detailed_mechanics(
|
||||||
reg_addr,
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
raise_error_mid_stream,
|
raise_error_mid_stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
p = await n.start_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
p = await an.start_actor(
|
||||||
'aio_server',
|
'aio_server',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -947,6 +995,8 @@ def test_sigint_closes_lifetime_stack(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
|
delay = 999 if tractor.debug_mode() else 1
|
||||||
try:
|
try:
|
||||||
an: tractor.ActorNursery
|
an: tractor.ActorNursery
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
@ -997,7 +1047,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if wait_for_ctx:
|
if wait_for_ctx:
|
||||||
print('waiting for ctx outcome in parent..')
|
print('waiting for ctx outcome in parent..')
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
await ctx.wait_for_result()
|
await ctx.wait_for_result()
|
||||||
except tractor.ContextCancelled as ctxc:
|
except tractor.ContextCancelled as ctxc:
|
||||||
assert ctxc.canceller == ctx.chan.uid
|
assert ctxc.canceller == ctx.chan.uid
|
||||||
|
|
|
@ -43,6 +43,7 @@ from ._state import (
|
||||||
current_actor as current_actor,
|
current_actor as current_actor,
|
||||||
is_root_process as is_root_process,
|
is_root_process as is_root_process,
|
||||||
current_ipc_ctx as current_ipc_ctx,
|
current_ipc_ctx as current_ipc_ctx,
|
||||||
|
debug_mode as debug_mode
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled as ContextCancelled,
|
ContextCancelled as ContextCancelled,
|
||||||
|
@ -65,3 +66,4 @@ from ._root import (
|
||||||
from ._ipc import Channel as Channel
|
from ._ipc import Channel as Channel
|
||||||
from ._portal import Portal as Portal
|
from ._portal import Portal as Portal
|
||||||
from ._runtime import Actor as Actor
|
from ._runtime import Actor as Actor
|
||||||
|
from . import hilevel as hilevel
|
||||||
|
|
|
@ -103,7 +103,16 @@ class AsyncioTaskExited(Exception):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
class TrioTaskExited(AsyncioCancelled):
|
class TrioCancelled(Exception):
|
||||||
|
'''
|
||||||
|
Trio cancelled translation (non-base) error
|
||||||
|
for use with the `to_asyncio` module
|
||||||
|
to be raised in the `asyncio.Task` to indicate
|
||||||
|
that the `trio` side raised `Cancelled` or an error.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
class TrioTaskExited(Exception):
|
||||||
'''
|
'''
|
||||||
The `trio`-side task exited without explicitly cancelling the
|
The `trio`-side task exited without explicitly cancelling the
|
||||||
`asyncio.Task` peer.
|
`asyncio.Task` peer.
|
||||||
|
@ -406,6 +415,9 @@ class RemoteActorError(Exception):
|
||||||
String-name of the (last hop's) boxed error type.
|
String-name of the (last hop's) boxed error type.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# TODO, maybe support also serializing the
|
||||||
|
# `ExceptionGroup.exeptions: list[BaseException]` set under
|
||||||
|
# certain conditions?
|
||||||
bt: Type[BaseException] = self.boxed_type
|
bt: Type[BaseException] = self.boxed_type
|
||||||
if bt:
|
if bt:
|
||||||
return str(bt.__name__)
|
return str(bt.__name__)
|
||||||
|
@ -821,8 +833,11 @@ class MsgTypeError(
|
||||||
'''
|
'''
|
||||||
if (
|
if (
|
||||||
(_bad_msg := self.msgdata.get('_bad_msg'))
|
(_bad_msg := self.msgdata.get('_bad_msg'))
|
||||||
and
|
and (
|
||||||
isinstance(_bad_msg, PayloadMsg)
|
isinstance(_bad_msg, PayloadMsg)
|
||||||
|
or
|
||||||
|
isinstance(_bad_msg, msgtypes.Start)
|
||||||
|
)
|
||||||
):
|
):
|
||||||
return _bad_msg
|
return _bad_msg
|
||||||
|
|
||||||
|
|
|
@ -116,6 +116,8 @@ class LinkedTaskChannel(
|
||||||
_trio_task: trio.Task
|
_trio_task: trio.Task
|
||||||
_aio_task_complete: trio.Event
|
_aio_task_complete: trio.Event
|
||||||
|
|
||||||
|
_suppress_graceful_exits: bool = True
|
||||||
|
|
||||||
_trio_err: BaseException|None = None
|
_trio_err: BaseException|None = None
|
||||||
_trio_to_raise: (
|
_trio_to_raise: (
|
||||||
AsyncioTaskExited| # aio task exits while trio ongoing
|
AsyncioTaskExited| # aio task exits while trio ongoing
|
||||||
|
@ -229,6 +231,8 @@ class LinkedTaskChannel(
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
async with translate_aio_errors(
|
async with translate_aio_errors(
|
||||||
chan=self,
|
chan=self,
|
||||||
|
# NOTE, determined by `open_channel_from()` input arg
|
||||||
|
suppress_graceful_exits=self._suppress_graceful_exits,
|
||||||
|
|
||||||
# XXX: obviously this will deadlock if an on-going stream is
|
# XXX: obviously this will deadlock if an on-going stream is
|
||||||
# being procesed.
|
# being procesed.
|
||||||
|
@ -300,6 +304,7 @@ def _run_asyncio_task(
|
||||||
*,
|
*,
|
||||||
qsize: int = 1,
|
qsize: int = 1,
|
||||||
provide_channels: bool = False,
|
provide_channels: bool = False,
|
||||||
|
suppress_graceful_exits: bool = True,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -352,6 +357,7 @@ def _run_asyncio_task(
|
||||||
_trio_cs=trio_cs,
|
_trio_cs=trio_cs,
|
||||||
_trio_task=trio_task,
|
_trio_task=trio_task,
|
||||||
_aio_task_complete=aio_task_complete,
|
_aio_task_complete=aio_task_complete,
|
||||||
|
_suppress_graceful_exits=suppress_graceful_exits,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def wait_on_coro_final_result(
|
async def wait_on_coro_final_result(
|
||||||
|
@ -365,7 +371,6 @@ def _run_asyncio_task(
|
||||||
`return`-ed result back to `trio`.
|
`return`-ed result back to `trio`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
nonlocal aio_err
|
|
||||||
nonlocal chan
|
nonlocal chan
|
||||||
|
|
||||||
orig = result = id(coro)
|
orig = result = id(coro)
|
||||||
|
@ -383,7 +388,6 @@ def _run_asyncio_task(
|
||||||
'`asyncio` task errored\n'
|
'`asyncio` task errored\n'
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if (
|
if (
|
||||||
result != orig
|
result != orig
|
||||||
|
@ -419,7 +423,11 @@ def _run_asyncio_task(
|
||||||
# - raise error and aio side errors "independently"
|
# - raise error and aio side errors "independently"
|
||||||
# on next tick (SEE draft HANDLER BELOW).
|
# on next tick (SEE draft HANDLER BELOW).
|
||||||
stats: trio.MemoryChannelStatistics = to_trio.statistics()
|
stats: trio.MemoryChannelStatistics = to_trio.statistics()
|
||||||
if stats.tasks_waiting_receive:
|
if (
|
||||||
|
stats.tasks_waiting_receive
|
||||||
|
and
|
||||||
|
not chan._aio_err
|
||||||
|
):
|
||||||
chan._trio_to_raise = AsyncioTaskExited(
|
chan._trio_to_raise = AsyncioTaskExited(
|
||||||
f'Task existed with final result\n'
|
f'Task existed with final result\n'
|
||||||
f'{result!r}\n'
|
f'{result!r}\n'
|
||||||
|
@ -513,6 +521,9 @@ def _run_asyncio_task(
|
||||||
f')> {res}\n'
|
f')> {res}\n'
|
||||||
f' |_{task}\n'
|
f' |_{task}\n'
|
||||||
)
|
)
|
||||||
|
if not chan._aio_result:
|
||||||
|
chan._aio_result = res
|
||||||
|
|
||||||
# ?TODO, should we also raise `AsyncioTaskExited[res]`
|
# ?TODO, should we also raise `AsyncioTaskExited[res]`
|
||||||
# in any case where trio is NOT blocking on the
|
# in any case where trio is NOT blocking on the
|
||||||
# `._to_trio` chan?
|
# `._to_trio` chan?
|
||||||
|
@ -593,6 +604,7 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
# is trio the src of the aio task's exc-as-outcome?
|
# is trio the src of the aio task's exc-as-outcome?
|
||||||
trio_err: BaseException|None = chan._trio_err
|
trio_err: BaseException|None = chan._trio_err
|
||||||
|
curr_aio_err: BaseException|None = chan._aio_err
|
||||||
if (
|
if (
|
||||||
curr_aio_err
|
curr_aio_err
|
||||||
or
|
or
|
||||||
|
@ -648,6 +660,8 @@ def _run_asyncio_task(
|
||||||
trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise
|
trio_to_raise: AsyncioCancelled|AsyncioTaskExited = chan._trio_to_raise
|
||||||
aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise
|
aio_to_raise: TrioTaskExited|TrioCancelled = chan._aio_to_raise
|
||||||
if (
|
if (
|
||||||
|
not chan._aio_result
|
||||||
|
and
|
||||||
not trio_cs.cancelled_caught
|
not trio_cs.cancelled_caught
|
||||||
and (
|
and (
|
||||||
(aio_err and type(aio_err) not in {
|
(aio_err and type(aio_err) not in {
|
||||||
|
@ -680,7 +694,17 @@ def _run_asyncio_task(
|
||||||
# match the one we just caught from the task above!
|
# match the one we just caught from the task above!
|
||||||
# (that would indicate something weird/very-wrong
|
# (that would indicate something weird/very-wrong
|
||||||
# going on?)
|
# going on?)
|
||||||
if aio_err is not trio_to_raise:
|
if (
|
||||||
|
aio_err is not trio_to_raise
|
||||||
|
and (
|
||||||
|
not suppress_graceful_exits
|
||||||
|
and (
|
||||||
|
chan._aio_result is not Unresolved
|
||||||
|
and
|
||||||
|
isinstance(trio_to_raise, AsyncioTaskExited)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
):
|
||||||
# raise aio_err from relayed_aio_err
|
# raise aio_err from relayed_aio_err
|
||||||
raise trio_to_raise from curr_aio_err
|
raise trio_to_raise from curr_aio_err
|
||||||
|
|
||||||
|
@ -715,6 +739,7 @@ async def translate_aio_errors(
|
||||||
trio_task = trio.lowlevel.current_task()
|
trio_task = trio.lowlevel.current_task()
|
||||||
aio_err: BaseException|None = chan._aio_err
|
aio_err: BaseException|None = chan._aio_err
|
||||||
aio_task: asyncio.Task = chan._aio_task
|
aio_task: asyncio.Task = chan._aio_task
|
||||||
|
aio_done_before_trio: bool = aio_task.done()
|
||||||
assert aio_task
|
assert aio_task
|
||||||
trio_err: BaseException|None = None
|
trio_err: BaseException|None = None
|
||||||
to_raise_trio: BaseException|None = None
|
to_raise_trio: BaseException|None = None
|
||||||
|
@ -1011,6 +1036,8 @@ async def translate_aio_errors(
|
||||||
# -[ ] make this a channel method, OR
|
# -[ ] make this a channel method, OR
|
||||||
# -[ ] just put back inline below?
|
# -[ ] just put back inline below?
|
||||||
#
|
#
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
# TODO, go back to inlining this..
|
||||||
def maybe_raise_aio_side_err(
|
def maybe_raise_aio_side_err(
|
||||||
trio_err: Exception,
|
trio_err: Exception,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -1028,11 +1055,10 @@ async def translate_aio_errors(
|
||||||
None
|
None
|
||||||
) = chan._trio_to_raise
|
) = chan._trio_to_raise
|
||||||
|
|
||||||
if (
|
if not suppress_graceful_exits:
|
||||||
trio_to_raise
|
raise trio_to_raise from (aio_err or trio_err)
|
||||||
and
|
|
||||||
suppress_graceful_exits
|
if trio_to_raise:
|
||||||
):
|
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
match (
|
match (
|
||||||
trio_to_raise,
|
trio_to_raise,
|
||||||
|
@ -1040,7 +1066,7 @@ async def translate_aio_errors(
|
||||||
):
|
):
|
||||||
case (
|
case (
|
||||||
AsyncioTaskExited(),
|
AsyncioTaskExited(),
|
||||||
None|trio.Cancelled(),
|
trio.Cancelled()|None,
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
'Ignoring aio exit signal since trio also exited!'
|
'Ignoring aio exit signal since trio also exited!'
|
||||||
|
@ -1051,12 +1077,13 @@ async def translate_aio_errors(
|
||||||
AsyncioCancelled(),
|
AsyncioCancelled(),
|
||||||
trio.Cancelled(),
|
trio.Cancelled(),
|
||||||
):
|
):
|
||||||
log.info(
|
if not aio_done_before_trio:
|
||||||
'Ignoring aio cancelled signal since trio was also cancelled!'
|
log.info(
|
||||||
)
|
'Ignoring aio cancelled signal since trio was also cancelled!'
|
||||||
return
|
)
|
||||||
|
return
|
||||||
raise trio_to_raise from (aio_err or trio_err)
|
case _:
|
||||||
|
raise trio_to_raise from (aio_err or trio_err)
|
||||||
|
|
||||||
# Check if the asyncio-side is the cause of the trio-side
|
# Check if the asyncio-side is the cause of the trio-side
|
||||||
# error.
|
# error.
|
||||||
|
@ -1128,6 +1155,7 @@ async def run_task(
|
||||||
async with translate_aio_errors(
|
async with translate_aio_errors(
|
||||||
chan,
|
chan,
|
||||||
wait_on_aio_task=True,
|
wait_on_aio_task=True,
|
||||||
|
suppress_graceful_exits=chan._suppress_graceful_exits,
|
||||||
):
|
):
|
||||||
# return single value that is the output from the
|
# return single value that is the output from the
|
||||||
# ``asyncio`` function-as-task. Expect the mem chan api
|
# ``asyncio`` function-as-task. Expect the mem chan api
|
||||||
|
@ -1143,7 +1171,8 @@ async def run_task(
|
||||||
async def open_channel_from(
|
async def open_channel_from(
|
||||||
|
|
||||||
target: Callable[..., Any],
|
target: Callable[..., Any],
|
||||||
**kwargs,
|
suppress_graceful_exits: bool = True,
|
||||||
|
**target_kwargs,
|
||||||
|
|
||||||
) -> AsyncIterator[Any]:
|
) -> AsyncIterator[Any]:
|
||||||
'''
|
'''
|
||||||
|
@ -1155,13 +1184,15 @@ async def open_channel_from(
|
||||||
target,
|
target,
|
||||||
qsize=2**8,
|
qsize=2**8,
|
||||||
provide_channels=True,
|
provide_channels=True,
|
||||||
**kwargs,
|
suppress_graceful_exits=suppress_graceful_exits,
|
||||||
|
**target_kwargs,
|
||||||
)
|
)
|
||||||
# TODO, tuple form here?
|
# TODO, tuple form here?
|
||||||
async with chan._from_aio:
|
async with chan._from_aio:
|
||||||
async with translate_aio_errors(
|
async with translate_aio_errors(
|
||||||
chan,
|
chan,
|
||||||
wait_on_aio_task=True,
|
wait_on_aio_task=True,
|
||||||
|
suppress_graceful_exits=suppress_graceful_exits,
|
||||||
):
|
):
|
||||||
# sync to a "started()"-like first delivered value from the
|
# sync to a "started()"-like first delivered value from the
|
||||||
# ``asyncio`` task.
|
# ``asyncio`` task.
|
||||||
|
|
Loading…
Reference in New Issue