Compare commits
4 Commits
4c82b6e94f
...
7fac170f8d
Author | SHA1 | Date |
---|---|---|
|
7fac170f8d | |
|
44281268a8 | |
|
064c5ce034 | |
|
6637473b54 |
|
@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
|
||||||
from tractor._testing import expect_ctxc
|
from tractor._testing import expect_ctxc
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(
|
||||||
|
scope='module',
|
||||||
|
)
|
||||||
|
def delay(debug_mode: bool) -> int:
|
||||||
|
if debug_mode:
|
||||||
|
return 999
|
||||||
|
else:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
async def sleep_and_err(
|
async def sleep_and_err(
|
||||||
sleep_for: float = 0.1,
|
sleep_for: float = 0.1,
|
||||||
|
|
||||||
|
@ -59,20 +69,26 @@ async def trio_cancels_single_aio_task():
|
||||||
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio_on_actor_side(reg_addr):
|
def test_trio_cancels_aio_on_actor_side(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Spawn an infected actor that is cancelled by the ``trio`` side
|
Spawn an infected actor that is cancelled by the ``trio`` side
|
||||||
task using std cancel scope apis.
|
task using std cancel scope apis.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
with trio.fail_after(1 + delay):
|
||||||
registry_addrs=[reg_addr]
|
async with tractor.open_nursery(
|
||||||
) as n:
|
registry_addrs=[reg_addr],
|
||||||
await n.run_in_actor(
|
debug_mode=debug_mode,
|
||||||
trio_cancels_single_aio_task,
|
) as an:
|
||||||
infect_asyncio=True,
|
await an.run_in_actor(
|
||||||
)
|
trio_cancels_single_aio_task,
|
||||||
|
infect_asyncio=True,
|
||||||
|
)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -116,7 +132,10 @@ async def asyncio_actor(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def test_aio_simple_error(reg_addr):
|
def test_aio_simple_error(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify a simple remote asyncio error propagates back through trio
|
Verify a simple remote asyncio error propagates back through trio
|
||||||
to the parent actor.
|
to the parent actor.
|
||||||
|
@ -125,9 +144,10 @@ def test_aio_simple_error(reg_addr):
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr]
|
registry_addrs=[reg_addr],
|
||||||
) as n:
|
debug_mode=debug_mode,
|
||||||
await n.run_in_actor(
|
) as an:
|
||||||
|
await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='sleep_and_err',
|
target='sleep_and_err',
|
||||||
expect_err='AssertionError',
|
expect_err='AssertionError',
|
||||||
|
@ -153,14 +173,19 @@ def test_aio_simple_error(reg_addr):
|
||||||
assert err.boxed_type is AssertionError
|
assert err.boxed_type is AssertionError
|
||||||
|
|
||||||
|
|
||||||
def test_tractor_cancels_aio(reg_addr):
|
def test_tractor_cancels_aio(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify we can cancel a spawned asyncio task gracefully.
|
Verify we can cancel a spawned asyncio task gracefully.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
portal = await n.run_in_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='aio_sleep_forever',
|
target='aio_sleep_forever',
|
||||||
expect_err='trio.Cancelled',
|
expect_err='trio.Cancelled',
|
||||||
|
@ -172,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def test_trio_cancels_aio(reg_addr):
|
def test_trio_cancels_aio(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Much like the above test with ``tractor.Portal.cancel_actor()``
|
Much like the above test with ``tractor.Portal.cancel_actor()``
|
||||||
except we just use a standard ``trio`` cancellation api.
|
except we just use a standard ``trio`` cancellation api.
|
||||||
|
@ -203,7 +230,8 @@ async def trio_ctx(
|
||||||
|
|
||||||
# this will block until the ``asyncio`` task sends a "first"
|
# this will block until the ``asyncio`` task sends a "first"
|
||||||
# message.
|
# message.
|
||||||
with trio.fail_after(2):
|
delay: int = 999 if tractor.debug_mode() else 1
|
||||||
|
with trio.fail_after(1 + delay):
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
trio.open_nursery(
|
||||||
|
@ -239,8 +267,10 @@ async def trio_ctx(
|
||||||
ids='parent_actor_cancels_child={}'.format
|
ids='parent_actor_cancels_child={}'.format
|
||||||
)
|
)
|
||||||
def test_context_spawns_aio_task_that_errors(
|
def test_context_spawns_aio_task_that_errors(
|
||||||
reg_addr,
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
parent_cancels: bool,
|
parent_cancels: bool,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify that spawning a task via an intertask channel ctx mngr that
|
Verify that spawning a task via an intertask channel ctx mngr that
|
||||||
|
@ -249,13 +279,13 @@ def test_context_spawns_aio_task_that_errors(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
p = await n.start_actor(
|
p = await an.start_actor(
|
||||||
'aio_daemon',
|
'aio_daemon',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
loglevel='cancel',
|
loglevel='cancel',
|
||||||
)
|
)
|
||||||
async with (
|
async with (
|
||||||
|
@ -322,11 +352,12 @@ async def aio_cancel():
|
||||||
|
|
||||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
reg_addr: tuple,
|
reg_addr: tuple,
|
||||||
|
delay: int,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
When the `asyncio.Task` cancels itself the `trio` side cshould
|
When the `asyncio.Task` cancels itself the `trio` side should
|
||||||
also cancel and teardown and relay the cancellation cross-process
|
also cancel and teardown and relay the cancellation cross-process
|
||||||
to the caller (parent).
|
to the parent caller.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -342,7 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# NOTE: normally the `an.__aexit__()` waits on the
|
# NOTE: normally the `an.__aexit__()` waits on the
|
||||||
# portal's result but we do it explicitly here
|
# portal's result but we do it explicitly here
|
||||||
# to avoid indent levels.
|
# to avoid indent levels.
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
await p.wait_for_result()
|
await p.wait_for_result()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
|
@ -353,11 +384,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
err: RemoteActorError|ExceptionGroup = excinfo.value
|
err: RemoteActorError|ExceptionGroup = excinfo.value
|
||||||
if isinstance(err, ExceptionGroup):
|
if isinstance(err, ExceptionGroup):
|
||||||
err = next(itertools.dropwhile(
|
excs = err.exceptions
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
assert len(excs) == 1
|
||||||
err.exceptions
|
final_exc = excs[0]
|
||||||
))
|
assert isinstance(final_exc, tractor.RemoteActorError)
|
||||||
assert err
|
|
||||||
|
|
||||||
# relayed boxed error should be our `trio`-task's
|
# relayed boxed error should be our `trio`-task's
|
||||||
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
||||||
|
@ -370,15 +400,18 @@ async def no_to_trio_in_args():
|
||||||
|
|
||||||
|
|
||||||
async def push_from_aio_task(
|
async def push_from_aio_task(
|
||||||
|
|
||||||
sequence: Iterable,
|
sequence: Iterable,
|
||||||
to_trio: trio.abc.SendChannel,
|
to_trio: trio.abc.SendChannel,
|
||||||
expect_cancel: False,
|
expect_cancel: False,
|
||||||
fail_early: bool,
|
fail_early: bool,
|
||||||
|
exit_early: bool,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# print('trying breakpoint')
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
# sync caller ctx manager
|
# sync caller ctx manager
|
||||||
to_trio.send_nowait(True)
|
to_trio.send_nowait(True)
|
||||||
|
|
||||||
|
@ -387,10 +420,27 @@ async def push_from_aio_task(
|
||||||
to_trio.send_nowait(i)
|
to_trio.send_nowait(i)
|
||||||
await asyncio.sleep(0.001)
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
if i == 50 and fail_early:
|
if (
|
||||||
raise Exception
|
i == 50
|
||||||
|
):
|
||||||
|
if fail_early:
|
||||||
|
print('Raising exc from aio side!')
|
||||||
|
raise Exception
|
||||||
|
|
||||||
print('asyncio streamer complete!')
|
if exit_early:
|
||||||
|
# TODO? really you could enforce the same
|
||||||
|
# SC-proto we use for actors here with asyncio
|
||||||
|
# such that a Return[None] msg would be
|
||||||
|
# implicitly delivered to the trio side?
|
||||||
|
#
|
||||||
|
# XXX => this might be the end-all soln for
|
||||||
|
# converting any-inter-task system (regardless
|
||||||
|
# of maybe-remote runtime or language) to be
|
||||||
|
# SC-compat no?
|
||||||
|
print(f'asyncio breaking early @ {i!r}')
|
||||||
|
break
|
||||||
|
|
||||||
|
print('asyncio streaming complete!')
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
if not expect_cancel:
|
if not expect_cancel:
|
||||||
|
@ -402,9 +452,10 @@ async def push_from_aio_task(
|
||||||
|
|
||||||
|
|
||||||
async def stream_from_aio(
|
async def stream_from_aio(
|
||||||
exit_early: bool = False,
|
trio_exit_early: bool = False,
|
||||||
raise_err: bool = False,
|
trio_raise_err: bool = False,
|
||||||
aio_raise_err: bool = False,
|
aio_raise_err: bool = False,
|
||||||
|
aio_exit_early: bool = False,
|
||||||
fan_out: bool = False,
|
fan_out: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -417,8 +468,17 @@ async def stream_from_aio(
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
push_from_aio_task,
|
push_from_aio_task,
|
||||||
sequence=seq,
|
sequence=seq,
|
||||||
expect_cancel=raise_err or exit_early,
|
expect_cancel=trio_raise_err or trio_exit_early,
|
||||||
fail_early=aio_raise_err,
|
fail_early=aio_raise_err,
|
||||||
|
exit_early=aio_exit_early,
|
||||||
|
|
||||||
|
# such that we can test exit early cases
|
||||||
|
# for each side explicitly.
|
||||||
|
suppress_graceful_exits=(not(
|
||||||
|
aio_exit_early
|
||||||
|
or
|
||||||
|
trio_exit_early
|
||||||
|
))
|
||||||
|
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
|
|
||||||
|
@ -435,9 +495,9 @@ async def stream_from_aio(
|
||||||
pulled.append(value)
|
pulled.append(value)
|
||||||
|
|
||||||
if value == 50:
|
if value == 50:
|
||||||
if raise_err:
|
if trio_raise_err:
|
||||||
raise Exception
|
raise Exception
|
||||||
elif exit_early:
|
elif trio_exit_early:
|
||||||
print('`consume()` breaking early!\n')
|
print('`consume()` breaking early!\n')
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -454,11 +514,11 @@ async def stream_from_aio(
|
||||||
# tasks are joined..
|
# tasks are joined..
|
||||||
chan.subscribe() as br,
|
chan.subscribe() as br,
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
# start 2nd task that get's broadcast the same
|
# start 2nd task that get's broadcast the same
|
||||||
# value set.
|
# value set.
|
||||||
n.start_soon(consume, br)
|
tn.start_soon(consume, br)
|
||||||
await consume(chan)
|
await consume(chan)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -471,10 +531,14 @@ async def stream_from_aio(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
if (
|
if not (
|
||||||
not raise_err and
|
trio_raise_err
|
||||||
not exit_early and
|
or
|
||||||
not aio_raise_err
|
trio_exit_early
|
||||||
|
or
|
||||||
|
aio_raise_err
|
||||||
|
or
|
||||||
|
aio_exit_early
|
||||||
):
|
):
|
||||||
if fan_out:
|
if fan_out:
|
||||||
# we get double the pulled values in the
|
# we get double the pulled values in the
|
||||||
|
@ -484,6 +548,7 @@ async def stream_from_aio(
|
||||||
assert list(sorted(pulled)) == expect
|
assert list(sorted(pulled)) == expect
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# await tractor.pause()
|
||||||
assert pulled == expect
|
assert pulled == expect
|
||||||
else:
|
else:
|
||||||
assert not fan_out
|
assert not fan_out
|
||||||
|
@ -497,10 +562,13 @@ async def stream_from_aio(
|
||||||
'fan_out', [False, True],
|
'fan_out', [False, True],
|
||||||
ids='fan_out_w_chan_subscribe={}'.format
|
ids='fan_out_w_chan_subscribe={}'.format
|
||||||
)
|
)
|
||||||
def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
def test_basic_interloop_channel_stream(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
fan_out: bool,
|
||||||
|
):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
fan_out=fan_out,
|
fan_out=fan_out,
|
||||||
|
@ -514,10 +582,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||||
# TODO: parametrize the above test and avoid the duplication here?
|
# TODO: parametrize the above test and avoid the duplication here?
|
||||||
def test_trio_error_cancels_intertask_chan(reg_addr):
|
def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
raise_err=True,
|
trio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
|
@ -530,43 +598,116 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
excinfo.value.boxed_type is Exception
|
excinfo.value.boxed_type is Exception
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(
|
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
delay: int,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Check that if the `trio`-task "exits early" on `async for`ing the
|
Check that if the `trio`-task "exits early and silently" (in this
|
||||||
inter-task-channel (via a `break`) we exit silently from the
|
case during `async for`-ing the inter-task-channel via
|
||||||
`open_channel_from()` block and get a final `Return[None]` msg.
|
a `break`-from-loop), we raise `TrioTaskExited` on the
|
||||||
|
`asyncio`-side which also then bubbles up through the
|
||||||
|
`open_channel_from()` block indicating that the `asyncio.Task`
|
||||||
|
hit a ran another checkpoint despite the `trio.Task` exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(1 + delay):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# debug_mode=True,
|
debug_mode=debug_mode,
|
||||||
# enable_stack_on_sig=True,
|
# enable_stack_on_sig=True,
|
||||||
) as n:
|
) as an:
|
||||||
portal = await n.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
exit_early=True,
|
trio_exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should raise RAE diectly
|
# should raise RAE diectly
|
||||||
print('waiting on final infected subactor result..')
|
print('waiting on final infected subactor result..')
|
||||||
res: None = await portal.wait_for_result()
|
res: None = await portal.wait_for_result()
|
||||||
assert res is None
|
assert res is None
|
||||||
print('infected subactor returned result: {res!r}\n')
|
print(f'infected subactor returned result: {res!r}\n')
|
||||||
|
|
||||||
# should be a quiet exit on a simple channel exit
|
# should be a quiet exit on a simple channel exit
|
||||||
trio.run(
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
main,
|
trio.run(main)
|
||||||
# strict_exception_groups=False,
|
|
||||||
)
|
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||||
|
# which indicates to the aio task that the trio side exited
|
||||||
|
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||||
|
# normally be raised instead as a `AsyncioCancelled`).
|
||||||
|
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
|
||||||
|
|
||||||
|
|
||||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||||
|
# TODO, parametrize the 3 possible trio side conditions:
|
||||||
|
# - trio blocking on receive, aio exits early
|
||||||
|
# - trio cancelled AND aio exits early on its next tick
|
||||||
|
# - trio errors AND aio exits early on its next tick
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
delay: int,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Check that if the `asyncio`-task "exits early and silently" (in this
|
||||||
|
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
|
||||||
|
it `break`s from the loop), we raise `AsyncioTaskExited` on the
|
||||||
|
`trio`-side which then DOES NOT BUBBLE up through the
|
||||||
|
`open_channel_from()` block UNLESS,
|
||||||
|
|
||||||
|
- the trio.Task also errored/cancelled, in which case we wrap
|
||||||
|
both errors in an eg
|
||||||
|
- the trio.Task was blocking on rxing a value from the
|
||||||
|
`InterLoopTaskChannel`.
|
||||||
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
with trio.fail_after(1 + delay):
|
||||||
portal = await n.run_in_actor(
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
# enable_stack_on_sig=True,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
|
stream_from_aio,
|
||||||
|
infect_asyncio=True,
|
||||||
|
trio_exit_early=False,
|
||||||
|
aio_exit_early=True,
|
||||||
|
)
|
||||||
|
# should raise RAE diectly
|
||||||
|
print('waiting on final infected subactor result..')
|
||||||
|
res: None = await portal.wait_for_result()
|
||||||
|
assert res is None
|
||||||
|
print(f'infected subactor returned result: {res!r}\n')
|
||||||
|
|
||||||
|
# should be a quiet exit on a simple channel exit
|
||||||
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
exc = excinfo.value
|
||||||
|
|
||||||
|
# TODO, wow bug!
|
||||||
|
# -[ ] bp handler not replaced!?!?
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||||
|
# which indicates to the aio task that the trio side exited
|
||||||
|
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||||
|
# normally be raised instead as a `AsyncioCancelled`).
|
||||||
|
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
||||||
|
|
||||||
|
|
||||||
|
def test_aio_errors_and_channel_propagates_and_closes(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
aio_raise_err=True,
|
aio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -641,13 +782,15 @@ async def trio_to_aio_echo_server(
|
||||||
ids='raise_error={}'.format,
|
ids='raise_error={}'.format,
|
||||||
)
|
)
|
||||||
def test_echoserver_detailed_mechanics(
|
def test_echoserver_detailed_mechanics(
|
||||||
reg_addr,
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
raise_error_mid_stream,
|
raise_error_mid_stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
p = await n.start_actor(
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
p = await an.start_actor(
|
||||||
'aio_server',
|
'aio_server',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -852,6 +995,8 @@ def test_sigint_closes_lifetime_stack(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
|
delay = 999 if tractor.debug_mode() else 1
|
||||||
try:
|
try:
|
||||||
an: tractor.ActorNursery
|
an: tractor.ActorNursery
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
@ -902,7 +1047,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if wait_for_ctx:
|
if wait_for_ctx:
|
||||||
print('waiting for ctx outcome in parent..')
|
print('waiting for ctx outcome in parent..')
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1 + delay):
|
||||||
await ctx.wait_for_result()
|
await ctx.wait_for_result()
|
||||||
except tractor.ContextCancelled as ctxc:
|
except tractor.ContextCancelled as ctxc:
|
||||||
assert ctxc.canceller == ctx.chan.uid
|
assert ctxc.canceller == ctx.chan.uid
|
||||||
|
|
|
@ -43,6 +43,7 @@ from ._state import (
|
||||||
current_actor as current_actor,
|
current_actor as current_actor,
|
||||||
is_root_process as is_root_process,
|
is_root_process as is_root_process,
|
||||||
current_ipc_ctx as current_ipc_ctx,
|
current_ipc_ctx as current_ipc_ctx,
|
||||||
|
debug_mode as debug_mode
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled as ContextCancelled,
|
ContextCancelled as ContextCancelled,
|
||||||
|
@ -65,3 +66,4 @@ from ._root import (
|
||||||
from ._ipc import Channel as Channel
|
from ._ipc import Channel as Channel
|
||||||
from ._portal import Portal as Portal
|
from ._portal import Portal as Portal
|
||||||
from ._runtime import Actor as Actor
|
from ._runtime import Actor as Actor
|
||||||
|
from . import hilevel as hilevel
|
||||||
|
|
|
@ -82,6 +82,48 @@ class InternalError(RuntimeError):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
class AsyncioCancelled(Exception):
|
||||||
|
'''
|
||||||
|
Asyncio cancelled translation (non-base) error
|
||||||
|
for use with the ``to_asyncio`` module
|
||||||
|
to be raised in the ``trio`` side task
|
||||||
|
|
||||||
|
NOTE: this should NOT inherit from `asyncio.CancelledError` or
|
||||||
|
tests should break!
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncioTaskExited(Exception):
|
||||||
|
'''
|
||||||
|
asyncio.Task "exited" translation error for use with the
|
||||||
|
`to_asyncio` APIs to be raised in the `trio` side task indicating
|
||||||
|
on `.run_task()`/`.open_channel_from()` exit that the aio side
|
||||||
|
exited early/silently.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
class TrioCancelled(Exception):
|
||||||
|
'''
|
||||||
|
Trio cancelled translation (non-base) error
|
||||||
|
for use with the `to_asyncio` module
|
||||||
|
to be raised in the `asyncio.Task` to indicate
|
||||||
|
that the `trio` side raised `Cancelled` or an error.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
class TrioTaskExited(Exception):
|
||||||
|
'''
|
||||||
|
The `trio`-side task exited without explicitly cancelling the
|
||||||
|
`asyncio.Task` peer.
|
||||||
|
|
||||||
|
This is very similar to how `trio.ClosedResource` acts as
|
||||||
|
a "clean shutdown" signal to the consumer side of a mem-chan,
|
||||||
|
|
||||||
|
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
# NOTE: more or less should be close to these:
|
# NOTE: more or less should be close to these:
|
||||||
# 'boxed_type',
|
# 'boxed_type',
|
||||||
|
@ -127,8 +169,8 @@ _body_fields: list[str] = list(
|
||||||
|
|
||||||
def get_err_type(type_name: str) -> BaseException|None:
|
def get_err_type(type_name: str) -> BaseException|None:
|
||||||
'''
|
'''
|
||||||
Look up an exception type by name from the set of locally
|
Look up an exception type by name from the set of locally known
|
||||||
known namespaces:
|
namespaces:
|
||||||
|
|
||||||
- `builtins`
|
- `builtins`
|
||||||
- `tractor._exceptions`
|
- `tractor._exceptions`
|
||||||
|
@ -358,6 +400,13 @@ class RemoteActorError(Exception):
|
||||||
self._ipc_msg.src_type_str
|
self._ipc_msg.src_type_str
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self._src_type:
|
||||||
|
raise TypeError(
|
||||||
|
f'Failed to lookup src error type with '
|
||||||
|
f'`tractor._exceptions.get_err_type()` :\n'
|
||||||
|
f'{self.src_type_str}'
|
||||||
|
)
|
||||||
|
|
||||||
return self._src_type
|
return self._src_type
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -366,6 +415,9 @@ class RemoteActorError(Exception):
|
||||||
String-name of the (last hop's) boxed error type.
|
String-name of the (last hop's) boxed error type.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# TODO, maybe support also serializing the
|
||||||
|
# `ExceptionGroup.exeptions: list[BaseException]` set under
|
||||||
|
# certain conditions?
|
||||||
bt: Type[BaseException] = self.boxed_type
|
bt: Type[BaseException] = self.boxed_type
|
||||||
if bt:
|
if bt:
|
||||||
return str(bt.__name__)
|
return str(bt.__name__)
|
||||||
|
@ -652,16 +704,10 @@ class RemoteActorError(Exception):
|
||||||
failing actor's remote env.
|
failing actor's remote env.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
src_type_ref: Type[BaseException] = self.src_type
|
|
||||||
if not src_type_ref:
|
|
||||||
raise TypeError(
|
|
||||||
'Failed to lookup src error type:\n'
|
|
||||||
f'{self.src_type_str}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: better tb insertion and all the fancier dunder
|
# TODO: better tb insertion and all the fancier dunder
|
||||||
# metadata stuff as per `.__context__` etc. and friends:
|
# metadata stuff as per `.__context__` etc. and friends:
|
||||||
# https://github.com/python-trio/trio/issues/611
|
# https://github.com/python-trio/trio/issues/611
|
||||||
|
src_type_ref: Type[BaseException] = self.src_type
|
||||||
return src_type_ref(self.tb_str)
|
return src_type_ref(self.tb_str)
|
||||||
|
|
||||||
# TODO: local recontruction of nested inception for a given
|
# TODO: local recontruction of nested inception for a given
|
||||||
|
@ -787,8 +833,11 @@ class MsgTypeError(
|
||||||
'''
|
'''
|
||||||
if (
|
if (
|
||||||
(_bad_msg := self.msgdata.get('_bad_msg'))
|
(_bad_msg := self.msgdata.get('_bad_msg'))
|
||||||
and
|
and (
|
||||||
isinstance(_bad_msg, PayloadMsg)
|
isinstance(_bad_msg, PayloadMsg)
|
||||||
|
or
|
||||||
|
isinstance(_bad_msg, msgtypes.Start)
|
||||||
|
)
|
||||||
):
|
):
|
||||||
return _bad_msg
|
return _bad_msg
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue