Update tests for `PldRx` and `Context` changes
Mostly adjustments for the new pld-receiver semantics/shim-layer which results more often in the direct delivery of `RemoteActorError`s from IPC API primitives (like `Portal.result()`) instead of being embedded in an `ExceptionGroup` bundled from an embedded nursery. Tossed usage of the `debug_mode: bool` fixture to a couple problematic tests while i was working on them. Also includes detailed assertion updates to the inter-peer cancellation suite in terms of, - `Context.canceller` state correctly matching the true src actor when expecting a ctxc. - any rxed `ContextCancelled` should instance match the `Context._local/remote_error` as should the `.msgdata` and `._ipc_msg`.runtime_to_msgspec
parent
fc075e96c6
commit
5cb0cc0f0b
|
@ -97,6 +97,7 @@ def test_ipc_channel_break_during_stream(
|
||||||
examples_dir() / 'advanced_faults'
|
examples_dir() / 'advanced_faults'
|
||||||
/ 'ipc_failure_during_stream.py',
|
/ 'ipc_failure_during_stream.py',
|
||||||
root=examples_dir(),
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
# by def we expect KBI from user after a simulated "hang
|
# by def we expect KBI from user after a simulated "hang
|
||||||
|
|
|
@ -89,17 +89,30 @@ def test_remote_error(reg_addr, args_err):
|
||||||
assert excinfo.value.boxed_type == errtype
|
assert excinfo.value.boxed_type == errtype
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# the root task will also error on the `.result()` call
|
# the root task will also error on the `Portal.result()`
|
||||||
# so we expect an error from there AND the child.
|
# call so we expect an error from there AND the child.
|
||||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
# |_ tho seems like on new `trio` this doesn't always
|
||||||
|
# happen?
|
||||||
|
with pytest.raises((
|
||||||
|
BaseExceptionGroup,
|
||||||
|
tractor.RemoteActorError,
|
||||||
|
)) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors
|
# ensure boxed errors are `errtype`
|
||||||
for exc in excinfo.value.exceptions:
|
err: BaseException = excinfo.value
|
||||||
|
if isinstance(err, BaseExceptionGroup):
|
||||||
|
suberrs: list[BaseException] = err.exceptions
|
||||||
|
else:
|
||||||
|
suberrs: list[BaseException] = [err]
|
||||||
|
|
||||||
|
for exc in suberrs:
|
||||||
assert exc.boxed_type == errtype
|
assert exc.boxed_type == errtype
|
||||||
|
|
||||||
|
|
||||||
def test_multierror(reg_addr):
|
def test_multierror(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
||||||
more then one actor errors.
|
more then one actor errors.
|
||||||
|
|
|
@ -444,6 +444,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
fan_out=fan_out,
|
fan_out=fan_out,
|
||||||
)
|
)
|
||||||
|
# should raise RAE diectly
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
@ -461,12 +462,11 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors
|
# ensure boxed error type
|
||||||
for exc in excinfo.value.exceptions:
|
excinfo.value.boxed_type == Exception
|
||||||
assert exc.boxed_type == Exception
|
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(reg_addr):
|
def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||||
|
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||||
exit_early=True,
|
exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should raise RAE diectly
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
# should be a quiet exit on a simple channel exit
|
# should be a quiet exit on a simple channel exit
|
||||||
|
@ -492,15 +492,17 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
aio_raise_err=True,
|
aio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should trigger RAE directly, not an eg.
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
with pytest.raises(
|
||||||
|
# NOTE: bc we directly wait on `Portal.result()` instead
|
||||||
|
# of capturing it inside the `ActorNursery` machinery.
|
||||||
|
expected_exception=RemoteActorError,
|
||||||
|
) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors
|
excinfo.value.boxed_type == Exception
|
||||||
for exc in excinfo.value.exceptions:
|
|
||||||
assert exc.boxed_type == Exception
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -55,9 +55,10 @@ from tractor._testing import (
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def sleep_forever(
|
async def open_stream_then_sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
expect_ctxc: bool = False,
|
expect_ctxc: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Sync the context, open a stream then just sleep.
|
Sync the context, open a stream then just sleep.
|
||||||
|
@ -67,6 +68,10 @@ async def sleep_forever(
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
|
# NOTE: the below means this child will send a `Stop`
|
||||||
|
# to it's parent-side task despite that side never
|
||||||
|
# opening a stream itself.
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
@ -100,7 +105,7 @@ async def error_before_started(
|
||||||
'''
|
'''
|
||||||
async with tractor.wait_for_actor('sleeper') as p2:
|
async with tractor.wait_for_actor('sleeper') as p2:
|
||||||
async with (
|
async with (
|
||||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
|
||||||
peer_ctx.open_stream(),
|
peer_ctx.open_stream(),
|
||||||
):
|
):
|
||||||
# NOTE: this WAS inside an @acm body but i factored it
|
# NOTE: this WAS inside an @acm body but i factored it
|
||||||
|
@ -204,9 +209,13 @@ async def stream_ints(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def stream_from_peer(
|
async def stream_from_peer(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
debug_mode: bool,
|
||||||
peer_name: str = 'sleeper',
|
peer_name: str = 'sleeper',
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
# sanity
|
||||||
|
assert tractor._state.debug_mode() == debug_mode
|
||||||
|
|
||||||
peer: Portal
|
peer: Portal
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
|
@ -240,26 +249,54 @@ async def stream_from_peer(
|
||||||
assert msg is not None
|
assert msg is not None
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
# NOTE: cancellation of the (sleeper) peer should always
|
# NOTE: cancellation of the (sleeper) peer should always cause
|
||||||
# cause a `ContextCancelled` raise in this streaming
|
# a `ContextCancelled` raise in this streaming actor.
|
||||||
# actor.
|
except ContextCancelled as _ctxc:
|
||||||
except ContextCancelled as ctxc:
|
ctxc = _ctxc
|
||||||
ctxerr = ctxc
|
|
||||||
|
|
||||||
assert peer_ctx._remote_error is ctxerr
|
# print("TRYING TO ENTER PAUSSE!!!")
|
||||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
# await tractor.pause(shield=True)
|
||||||
|
re: ContextCancelled = peer_ctx._remote_error
|
||||||
|
|
||||||
# XXX YES, bc exact same msg instances
|
# XXX YES XXX, remote error should be unpacked only once!
|
||||||
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
|
assert (
|
||||||
|
re
|
||||||
|
is
|
||||||
|
peer_ctx.maybe_error
|
||||||
|
is
|
||||||
|
ctxc
|
||||||
|
is
|
||||||
|
peer_ctx._local_error
|
||||||
|
)
|
||||||
|
# NOTE: these errors should all match!
|
||||||
|
# ------ - ------
|
||||||
|
# XXX [2024-05-03] XXX
|
||||||
|
# ------ - ------
|
||||||
|
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
|
||||||
|
# where the `Error()` msg was directly raising the ctxc
|
||||||
|
# instead of just returning up to the caller inside
|
||||||
|
# `Context.return()` which would results in a diff instance of
|
||||||
|
# the same remote error bubbling out above vs what was
|
||||||
|
# already unpacked and set inside `Context.
|
||||||
|
assert (
|
||||||
|
peer_ctx._remote_error.msgdata
|
||||||
|
==
|
||||||
|
ctxc.msgdata
|
||||||
|
)
|
||||||
|
# ^-XXX-^ notice the data is of course the exact same.. so
|
||||||
|
# the above larger assert makes sense to also always be true!
|
||||||
|
|
||||||
# XXX NO, bc new one always created for property accesss
|
# XXX YES XXX, bc should be exact same msg instances
|
||||||
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
|
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
|
||||||
|
|
||||||
|
# XXX NO XXX, bc new one always created for property accesss
|
||||||
|
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
|
||||||
|
|
||||||
# the peer ctx is the canceller even though it's canceller
|
# the peer ctx is the canceller even though it's canceller
|
||||||
# is the "canceller" XD
|
# is the "canceller" XD
|
||||||
assert peer_name in peer_ctx.canceller
|
assert peer_name in peer_ctx.canceller
|
||||||
|
|
||||||
assert "canceller" in ctxerr.canceller
|
assert "canceller" in ctxc.canceller
|
||||||
|
|
||||||
# caller peer should not be the cancel requester
|
# caller peer should not be the cancel requester
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
|
@ -283,12 +320,13 @@ async def stream_from_peer(
|
||||||
|
|
||||||
# TODO / NOTE `.canceller` won't have been set yet
|
# TODO / NOTE `.canceller` won't have been set yet
|
||||||
# here because that machinery is inside
|
# here because that machinery is inside
|
||||||
# `.open_context().__aexit__()` BUT, if we had
|
# `Portal.open_context().__aexit__()` BUT, if we had
|
||||||
# a way to know immediately (from the last
|
# a way to know immediately (from the last
|
||||||
# checkpoint) that cancellation was due to
|
# checkpoint) that cancellation was due to
|
||||||
# a remote, we COULD assert this here..see,
|
# a remote, we COULD assert this here..see,
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
#
|
#
|
||||||
|
# await tractor.pause()
|
||||||
# assert 'canceller' in ctx.canceller
|
# assert 'canceller' in ctx.canceller
|
||||||
|
|
||||||
# root/parent actor task should NEVER HAVE cancelled us!
|
# root/parent actor task should NEVER HAVE cancelled us!
|
||||||
|
@ -392,12 +430,13 @@ def test_peer_canceller(
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
sleeper.open_context(
|
sleeper.open_context(
|
||||||
sleep_forever,
|
open_stream_then_sleep_forever,
|
||||||
expect_ctxc=True,
|
expect_ctxc=True,
|
||||||
) as (sleeper_ctx, sent),
|
) as (sleeper_ctx, sent),
|
||||||
|
|
||||||
just_caller.open_context(
|
just_caller.open_context(
|
||||||
stream_from_peer,
|
stream_from_peer,
|
||||||
|
debug_mode=debug_mode,
|
||||||
) as (caller_ctx, sent),
|
) as (caller_ctx, sent),
|
||||||
|
|
||||||
canceller.open_context(
|
canceller.open_context(
|
||||||
|
@ -423,10 +462,11 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# should always raise since this root task does
|
# should always raise since this root task does
|
||||||
# not request the sleeper cancellation ;)
|
# not request the sleeper cancellation ;)
|
||||||
except ContextCancelled as ctxerr:
|
except ContextCancelled as _ctxc:
|
||||||
|
ctxc = _ctxc
|
||||||
print(
|
print(
|
||||||
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
||||||
f'{ctxerr}\n'
|
f'{ctxc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# canceller and caller peers should not
|
# canceller and caller peers should not
|
||||||
|
@ -437,7 +477,7 @@ def test_peer_canceller(
|
||||||
# we were not the actor, our peer was
|
# we were not the actor, our peer was
|
||||||
assert not sleeper_ctx.cancel_acked
|
assert not sleeper_ctx.cancel_acked
|
||||||
|
|
||||||
assert ctxerr.canceller[0] == 'canceller'
|
assert ctxc.canceller[0] == 'canceller'
|
||||||
|
|
||||||
# XXX NOTE XXX: since THIS `ContextCancelled`
|
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||||
# HAS NOT YET bubbled up to the
|
# HAS NOT YET bubbled up to the
|
||||||
|
@ -448,7 +488,7 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# CASE_1: error-during-ctxc-handling,
|
# CASE_1: error-during-ctxc-handling,
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
raise RuntimeError('Simulated error during teardown')
|
raise RuntimeError('Simulated RTE re-raise during ctxc handling')
|
||||||
|
|
||||||
# CASE_2: standard teardown inside in `.open_context()` block
|
# CASE_2: standard teardown inside in `.open_context()` block
|
||||||
raise
|
raise
|
||||||
|
@ -513,6 +553,9 @@ def test_peer_canceller(
|
||||||
# should be cancelled by US.
|
# should be cancelled by US.
|
||||||
#
|
#
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
|
print(f'loc_err: {_loc_err}\n')
|
||||||
|
assert isinstance(loc_err, RuntimeError)
|
||||||
|
|
||||||
# since we do a rte reraise above, the
|
# since we do a rte reraise above, the
|
||||||
# `.open_context()` error handling should have
|
# `.open_context()` error handling should have
|
||||||
# raised a local rte, thus the internal
|
# raised a local rte, thus the internal
|
||||||
|
@ -521,9 +564,6 @@ def test_peer_canceller(
|
||||||
# a `trio.Cancelled` due to a local
|
# a `trio.Cancelled` due to a local
|
||||||
# `._scope.cancel()` call.
|
# `._scope.cancel()` call.
|
||||||
assert not sleeper_ctx._scope.cancelled_caught
|
assert not sleeper_ctx._scope.cancelled_caught
|
||||||
|
|
||||||
assert isinstance(loc_err, RuntimeError)
|
|
||||||
print(f'_loc_err: {_loc_err}\n')
|
|
||||||
# assert sleeper_ctx._local_error is _loc_err
|
# assert sleeper_ctx._local_error is _loc_err
|
||||||
# assert sleeper_ctx._local_error is _loc_err
|
# assert sleeper_ctx._local_error is _loc_err
|
||||||
assert not (
|
assert not (
|
||||||
|
@ -560,10 +600,13 @@ def test_peer_canceller(
|
||||||
|
|
||||||
else: # the other 2 ctxs
|
else: # the other 2 ctxs
|
||||||
assert (
|
assert (
|
||||||
|
isinstance(re, ContextCancelled)
|
||||||
|
and (
|
||||||
re.canceller
|
re.canceller
|
||||||
==
|
==
|
||||||
canceller.channel.uid
|
canceller.channel.uid
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# since the sleeper errors while handling a
|
# since the sleeper errors while handling a
|
||||||
# peer-cancelled (by ctxc) scenario, we expect
|
# peer-cancelled (by ctxc) scenario, we expect
|
||||||
|
@ -811,8 +854,7 @@ async def serve_subactors(
|
||||||
async with open_nursery() as an:
|
async with open_nursery() as an:
|
||||||
|
|
||||||
# sanity
|
# sanity
|
||||||
if debug_mode:
|
assert tractor._state.debug_mode() == debug_mode
|
||||||
assert tractor._state.debug_mode()
|
|
||||||
|
|
||||||
await ctx.started(peer_name)
|
await ctx.started(peer_name)
|
||||||
async with ctx.open_stream() as ipc:
|
async with ctx.open_stream() as ipc:
|
||||||
|
@ -1091,7 +1133,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
'-> root checking `client_ctx.result()`,\n'
|
'-> root checking `client_ctx.result()`,\n'
|
||||||
f'-> checking that sub-spawn {peer_name} is down\n'
|
f'-> checking that sub-spawn {peer_name} is down\n'
|
||||||
)
|
)
|
||||||
# else:
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = await client_ctx.result(hide_tb=False)
|
res = await client_ctx.result(hide_tb=False)
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
Spawning basics
|
Spawning basics
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Optional
|
from typing import (
|
||||||
|
Any,
|
||||||
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
@ -25,13 +27,11 @@ async def spawn(
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
arbiter_addr=reg_addr,
|
arbiter_addr=reg_addr,
|
||||||
):
|
):
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
assert actor.is_arbiter == is_arbiter
|
assert actor.is_arbiter == is_arbiter
|
||||||
data = data_to_pass_down
|
data = data_to_pass_down
|
||||||
|
|
||||||
if actor.is_arbiter:
|
if actor.is_arbiter:
|
||||||
|
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
|
|
||||||
# forks here
|
# forks here
|
||||||
|
@ -95,7 +95,9 @@ async def test_movie_theatre_convo(start_method):
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
async def cellar_door(return_value: Optional[str]):
|
async def cellar_door(
|
||||||
|
return_value: str|None,
|
||||||
|
):
|
||||||
return return_value
|
return return_value
|
||||||
|
|
||||||
|
|
||||||
|
@ -105,16 +107,18 @@ async def cellar_door(return_value: Optional[str]):
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_most_beautiful_word(
|
async def test_most_beautiful_word(
|
||||||
start_method,
|
start_method: str,
|
||||||
return_value
|
return_value: Any,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
The main ``tractor`` routine.
|
The main ``tractor`` routine.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as n:
|
||||||
portal = await n.run_in_actor(
|
portal = await n.run_in_actor(
|
||||||
cellar_door,
|
cellar_door,
|
||||||
return_value=return_value,
|
return_value=return_value,
|
||||||
|
|
Loading…
Reference in New Issue