Compare commits
1 Commits
7fac170f8d
...
4c82b6e94f
Author | SHA1 | Date |
---|---|---|
|
4c82b6e94f |
|
@ -66,6 +66,8 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
# with trio.fail_after(1):
|
||||||
|
with trio.fail_after(999):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
registry_addrs=[reg_addr]
|
registry_addrs=[reg_addr]
|
||||||
) as n:
|
) as n:
|
||||||
|
@ -250,6 +252,7 @@ def test_context_spawns_aio_task_that_errors(
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(2):
|
||||||
|
# with trio.fail_after(999):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
p = await n.start_actor(
|
p = await n.start_actor(
|
||||||
'aio_daemon',
|
'aio_daemon',
|
||||||
|
@ -342,7 +345,8 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# NOTE: normally the `an.__aexit__()` waits on the
|
# NOTE: normally the `an.__aexit__()` waits on the
|
||||||
# portal's result but we do it explicitly here
|
# portal's result but we do it explicitly here
|
||||||
# to avoid indent levels.
|
# to avoid indent levels.
|
||||||
with trio.fail_after(1):
|
# with trio.fail_after(1):
|
||||||
|
with trio.fail_after(999):
|
||||||
await p.wait_for_result()
|
await p.wait_for_result()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
|
@ -353,11 +357,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
err: RemoteActorError|ExceptionGroup = excinfo.value
|
err: RemoteActorError|ExceptionGroup = excinfo.value
|
||||||
if isinstance(err, ExceptionGroup):
|
if isinstance(err, ExceptionGroup):
|
||||||
err = next(itertools.dropwhile(
|
excs = err.exceptions
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
assert len(excs) == 1
|
||||||
err.exceptions
|
final_exc = excs[0]
|
||||||
))
|
assert isinstance(final_exc, tractor.RemoteActorError)
|
||||||
assert err
|
|
||||||
|
|
||||||
# relayed boxed error should be our `trio`-task's
|
# relayed boxed error should be our `trio`-task's
|
||||||
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
||||||
|
@ -370,15 +373,18 @@ async def no_to_trio_in_args():
|
||||||
|
|
||||||
|
|
||||||
async def push_from_aio_task(
|
async def push_from_aio_task(
|
||||||
|
|
||||||
sequence: Iterable,
|
sequence: Iterable,
|
||||||
to_trio: trio.abc.SendChannel,
|
to_trio: trio.abc.SendChannel,
|
||||||
expect_cancel: False,
|
expect_cancel: False,
|
||||||
fail_early: bool,
|
fail_early: bool,
|
||||||
|
exit_early: bool,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# print('trying breakpoint')
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
# sync caller ctx manager
|
# sync caller ctx manager
|
||||||
to_trio.send_nowait(True)
|
to_trio.send_nowait(True)
|
||||||
|
|
||||||
|
@ -387,10 +393,26 @@ async def push_from_aio_task(
|
||||||
to_trio.send_nowait(i)
|
to_trio.send_nowait(i)
|
||||||
await asyncio.sleep(0.001)
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
if i == 50 and fail_early:
|
if (
|
||||||
|
i == 50
|
||||||
|
):
|
||||||
|
if fail_early:
|
||||||
raise Exception
|
raise Exception
|
||||||
|
|
||||||
print('asyncio streamer complete!')
|
if exit_early:
|
||||||
|
# TODO? really you could enforce the same
|
||||||
|
# SC-proto we use for actors here with asyncio
|
||||||
|
# such that a Return[None] msg would be
|
||||||
|
# implicitly delivered to the trio side?
|
||||||
|
#
|
||||||
|
# XXX => this might be the end-all soln for
|
||||||
|
# converting any-inter-task system (regardless
|
||||||
|
# of maybe-remote runtime or language) to be
|
||||||
|
# SC-compat no?
|
||||||
|
print(f'asyncio breaking early @ {i!r}')
|
||||||
|
break
|
||||||
|
|
||||||
|
print('asyncio streaming complete!')
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
if not expect_cancel:
|
if not expect_cancel:
|
||||||
|
@ -402,9 +424,10 @@ async def push_from_aio_task(
|
||||||
|
|
||||||
|
|
||||||
async def stream_from_aio(
|
async def stream_from_aio(
|
||||||
exit_early: bool = False,
|
trio_exit_early: bool = False,
|
||||||
raise_err: bool = False,
|
raise_err: bool = False,
|
||||||
aio_raise_err: bool = False,
|
aio_raise_err: bool = False,
|
||||||
|
aio_exit_early: bool = False,
|
||||||
fan_out: bool = False,
|
fan_out: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -417,8 +440,9 @@ async def stream_from_aio(
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
push_from_aio_task,
|
push_from_aio_task,
|
||||||
sequence=seq,
|
sequence=seq,
|
||||||
expect_cancel=raise_err or exit_early,
|
expect_cancel=raise_err or trio_exit_early,
|
||||||
fail_early=aio_raise_err,
|
fail_early=aio_raise_err,
|
||||||
|
exit_early=aio_exit_early,
|
||||||
|
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
|
|
||||||
|
@ -437,7 +461,7 @@ async def stream_from_aio(
|
||||||
if value == 50:
|
if value == 50:
|
||||||
if raise_err:
|
if raise_err:
|
||||||
raise Exception
|
raise Exception
|
||||||
elif exit_early:
|
elif trio_exit_early:
|
||||||
print('`consume()` breaking early!\n')
|
print('`consume()` breaking early!\n')
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -472,9 +496,13 @@ async def stream_from_aio(
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not raise_err and
|
not raise_err
|
||||||
not exit_early and
|
and
|
||||||
|
not trio_exit_early
|
||||||
|
and
|
||||||
not aio_raise_err
|
not aio_raise_err
|
||||||
|
and
|
||||||
|
not aio_exit_early
|
||||||
):
|
):
|
||||||
if fan_out:
|
if fan_out:
|
||||||
# we get double the pulled values in the
|
# we get double the pulled values in the
|
||||||
|
@ -484,6 +512,7 @@ async def stream_from_aio(
|
||||||
assert list(sorted(pulled)) == expect
|
assert list(sorted(pulled)) == expect
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# await tractor.pause()
|
||||||
assert pulled == expect
|
assert pulled == expect
|
||||||
else:
|
else:
|
||||||
assert not fan_out
|
assert not fan_out
|
||||||
|
@ -530,37 +559,103 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
excinfo.value.boxed_type is Exception
|
excinfo.value.boxed_type is Exception
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(
|
def test_trio_closes_early_causes_aio_checkpoint_raise(
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Check that if the `trio`-task "exits early" on `async for`ing the
|
Check that if the `trio`-task "exits early and silently" (in this
|
||||||
inter-task-channel (via a `break`) we exit silently from the
|
case during `async for`-ing the inter-task-channel via
|
||||||
`open_channel_from()` block and get a final `Return[None]` msg.
|
a `break`-from-loop), we raise `TrioTaskExited` on the
|
||||||
|
`asyncio`-side which also then bubbles up through the
|
||||||
|
`open_channel_from()` block indicating that the `asyncio.Task`
|
||||||
|
hit a ran another checkpoint despite the `trio.Task` exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(2):
|
||||||
|
# with trio.fail_after(999):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# debug_mode=True,
|
# debug_mode=True,
|
||||||
# enable_stack_on_sig=True,
|
# enable_stack_on_sig=True,
|
||||||
) as n:
|
) as n:
|
||||||
portal = await n.run_in_actor(
|
portal = await n.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
exit_early=True,
|
trio_exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should raise RAE diectly
|
# should raise RAE diectly
|
||||||
print('waiting on final infected subactor result..')
|
print('waiting on final infected subactor result..')
|
||||||
res: None = await portal.wait_for_result()
|
res: None = await portal.wait_for_result()
|
||||||
assert res is None
|
assert res is None
|
||||||
print('infected subactor returned result: {res!r}\n')
|
print(f'infected subactor returned result: {res!r}\n')
|
||||||
|
|
||||||
# should be a quiet exit on a simple channel exit
|
# should be a quiet exit on a simple channel exit
|
||||||
trio.run(
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
main,
|
trio.run(main)
|
||||||
# strict_exception_groups=False,
|
|
||||||
|
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||||
|
# which indicates to the aio task that the trio side exited
|
||||||
|
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||||
|
# normally be raised instead as a `AsyncioCancelled`).
|
||||||
|
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
|
||||||
|
|
||||||
|
|
||||||
|
def test_aio_exits_early_relays_AsyncioTaskExited(
|
||||||
|
# TODO, parametrize the 3 possible trio side conditions:
|
||||||
|
# - trio blocking on receive, aio exits early
|
||||||
|
# - trio cancelled AND aio exits early on its next tick
|
||||||
|
# - trio errors AND aio exits early on its next tick
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Check that if the `asyncio`-task "exits early and silently" (in this
|
||||||
|
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
|
||||||
|
it `break`s from the loop), we raise `AsyncioTaskExited` on the
|
||||||
|
`trio`-side which then DOES NOT BUBBLE up through the
|
||||||
|
`open_channel_from()` block UNLESS,
|
||||||
|
|
||||||
|
- the trio.Task also errored/cancelled, in which case we wrap
|
||||||
|
both errors in an eg
|
||||||
|
- the trio.Task was blocking on rxing a value from the
|
||||||
|
`InterLoopTaskChannel`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(2):
|
||||||
|
# with trio.fail_after(999):
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
# debug_mode=True,
|
||||||
|
# enable_stack_on_sig=True,
|
||||||
|
) as an:
|
||||||
|
portal = await an.run_in_actor(
|
||||||
|
stream_from_aio,
|
||||||
|
infect_asyncio=True,
|
||||||
|
trio_exit_early=False,
|
||||||
|
aio_exit_early=True,
|
||||||
)
|
)
|
||||||
|
# should raise RAE diectly
|
||||||
|
print('waiting on final infected subactor result..')
|
||||||
|
res: None = await portal.wait_for_result()
|
||||||
|
assert res is None
|
||||||
|
print(f'infected subactor returned result: {res!r}\n')
|
||||||
|
|
||||||
|
# should be a quiet exit on a simple channel exit
|
||||||
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
exc = excinfo.value
|
||||||
|
|
||||||
|
# TODO, wow bug!
|
||||||
|
# -[ ] bp handler not replaced!?!?
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
# ensure remote error is an explicit `AsyncioCancelled` sub-type
|
||||||
|
# which indicates to the aio task that the trio side exited
|
||||||
|
# silently WITHOUT raising a `trio.Cancelled` (which would
|
||||||
|
# normally be raised instead as a `AsyncioCancelled`).
|
||||||
|
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
|
||||||
|
|
||||||
|
|
||||||
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
|
|
|
@ -82,6 +82,39 @@ class InternalError(RuntimeError):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
class AsyncioCancelled(Exception):
|
||||||
|
'''
|
||||||
|
Asyncio cancelled translation (non-base) error
|
||||||
|
for use with the ``to_asyncio`` module
|
||||||
|
to be raised in the ``trio`` side task
|
||||||
|
|
||||||
|
NOTE: this should NOT inherit from `asyncio.CancelledError` or
|
||||||
|
tests should break!
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncioTaskExited(Exception):
|
||||||
|
'''
|
||||||
|
asyncio.Task "exited" translation error for use with the
|
||||||
|
`to_asyncio` APIs to be raised in the `trio` side task indicating
|
||||||
|
on `.run_task()`/`.open_channel_from()` exit that the aio side
|
||||||
|
exited early/silently.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
class TrioTaskExited(AsyncioCancelled):
|
||||||
|
'''
|
||||||
|
The `trio`-side task exited without explicitly cancelling the
|
||||||
|
`asyncio.Task` peer.
|
||||||
|
|
||||||
|
This is very similar to how `trio.ClosedResource` acts as
|
||||||
|
a "clean shutdown" signal to the consumer side of a mem-chan,
|
||||||
|
|
||||||
|
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
# NOTE: more or less should be close to these:
|
# NOTE: more or less should be close to these:
|
||||||
# 'boxed_type',
|
# 'boxed_type',
|
||||||
|
@ -127,8 +160,8 @@ _body_fields: list[str] = list(
|
||||||
|
|
||||||
def get_err_type(type_name: str) -> BaseException|None:
|
def get_err_type(type_name: str) -> BaseException|None:
|
||||||
'''
|
'''
|
||||||
Look up an exception type by name from the set of locally
|
Look up an exception type by name from the set of locally known
|
||||||
known namespaces:
|
namespaces:
|
||||||
|
|
||||||
- `builtins`
|
- `builtins`
|
||||||
- `tractor._exceptions`
|
- `tractor._exceptions`
|
||||||
|
@ -358,6 +391,13 @@ class RemoteActorError(Exception):
|
||||||
self._ipc_msg.src_type_str
|
self._ipc_msg.src_type_str
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self._src_type:
|
||||||
|
raise TypeError(
|
||||||
|
f'Failed to lookup src error type with '
|
||||||
|
f'`tractor._exceptions.get_err_type()` :\n'
|
||||||
|
f'{self.src_type_str}'
|
||||||
|
)
|
||||||
|
|
||||||
return self._src_type
|
return self._src_type
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -652,16 +692,10 @@ class RemoteActorError(Exception):
|
||||||
failing actor's remote env.
|
failing actor's remote env.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
src_type_ref: Type[BaseException] = self.src_type
|
|
||||||
if not src_type_ref:
|
|
||||||
raise TypeError(
|
|
||||||
'Failed to lookup src error type:\n'
|
|
||||||
f'{self.src_type_str}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: better tb insertion and all the fancier dunder
|
# TODO: better tb insertion and all the fancier dunder
|
||||||
# metadata stuff as per `.__context__` etc. and friends:
|
# metadata stuff as per `.__context__` etc. and friends:
|
||||||
# https://github.com/python-trio/trio/issues/611
|
# https://github.com/python-trio/trio/issues/611
|
||||||
|
src_type_ref: Type[BaseException] = self.src_type
|
||||||
return src_type_ref(self.tb_str)
|
return src_type_ref(self.tb_str)
|
||||||
|
|
||||||
# TODO: local recontruction of nested inception for a given
|
# TODO: local recontruction of nested inception for a given
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue