Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 8e66f45e23 Lul, don't overwrite 'tb_str' with src actor's..
This is what was breaking the nested debugger test (where it was failing
on the traceback content matching) and it makes sense.. XD
=> We always want to use the locally boxed `RemoteActorError`'s
traceback content NOT overwrite it with that from the src actor..

Also gets rid of setting the `'relay_uid'` since it's pulled from the
final element in the `'relay_path'` anyway.
2024-03-20 11:36:39 -04:00
Tyler Goodlet 290b0a86b1 Another cancel-req-invalid log msg fmt tweak 2024-03-20 10:42:17 -04:00
Tyler Goodlet d5e5174d97 Extend inter-peer cancel tests for "inceptions"
Use new `RemoteActorError` fields in various assertions particularly
ensuring that an RTE relayed through the spawner from the little_bro
shows up at the client with the right number of entries in the
`.relay_path` and that the error is raised in the client as desired in
the original use case from `modden`'s remote spawn spawn request API
(which was kinda the whole original motivation to finally get all this
multi-actor error relay stuff workin).

Case extensions:
- RTE relayed from little_bro through spawner to client when
  `raise_sub_spawn_error_after` is set; in this case test should raise
  the relayed and RAE boxed RTE right up to the `trio.run()`.
  -> ensure the `rae.src_uid`, `.relay_uid` are set correctly.
  -> ensure ctx cancels are no acked.
- use `expect_ctxc()` around root's `tell_little_bro()` usage.
- do `debug_mode` assertions when enabled by test harness in each actor
  layer.
- obvi use new `.src_type`/`.boxed_type` for final error propagation
  assertions.
2024-03-20 10:29:40 -04:00
Tyler Goodlet 8ab5e08830 Adjust advanced faults test(s) for absorbed EoCs
More or less just simplifies to not seeing the stream closure errors and
instead expecting KBIs from the simulated user who 'ctl-cs after hang'.

Toss in a little `stuff_hangin_ctlc()` to the script to wrap all that
and always check stream closure before sending the final KBI.
2024-03-19 19:33:06 -04:00
5 changed files with 319 additions and 138 deletions

View File

@ -6,6 +6,7 @@ been an outage) and we want to ensure that despite being in debug mode
actor tree will eventually be cancelled without leaving any zombies.
'''
from contextlib import asynccontextmanager as acm
from functools import partial
from tractor import (
@ -17,6 +18,7 @@ from tractor import (
_testing,
)
import trio
import pytest
async def break_ipc(
@ -41,6 +43,13 @@ async def break_ipc(
await stream.aclose()
method: str = method or def_method
print(
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
f'method: {method}\n'
f'pre `.aclose()`: {pre_close}\n'
'#################################\n'
)
match method:
case 'trans_aclose':
@ -80,17 +89,17 @@ async def break_ipc_then_error(
break_ipc_with: str|None = None,
pre_close: bool = False,
):
async for msg in stream:
await stream.send(msg)
await break_ipc(
stream=stream,
method=break_ipc_with,
pre_close=pre_close,
)
async for msg in stream:
await stream.send(msg)
assert 0
# async def close_stream_and_error(
async def iter_ipc_stream(
stream: MsgStream,
break_ipc_with: str|None = None,
@ -99,20 +108,6 @@ async def iter_ipc_stream(
async for msg in stream:
await stream.send(msg)
# wipe out channel right before raising
# await break_ipc(
# stream=stream,
# method=break_ipc_with,
# pre_close=pre_close,
# )
# send channel close msg at SC-prot level
#
# TODO: what should get raised here if anything?
# await stream.aclose()
# assert 0
@context
async def recv_and_spawn_net_killers(
@ -134,14 +129,16 @@ async def recv_and_spawn_net_killers(
async for i in stream:
print(f'child echoing {i}')
await stream.send(i)
if (
break_ipc_after
and
i > break_ipc_after
i >= break_ipc_after
):
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
'#################################\n'
n.start_soon(
iter_ipc_stream,
stream,
)
n.start_soon(
partial(
break_ipc_then_error,
@ -149,10 +146,23 @@ async def recv_and_spawn_net_killers(
pre_close=pre_close,
)
)
n.start_soon(
iter_ipc_stream,
stream,
@acm
async def stuff_hangin_ctlc(timeout: float = 1) -> None:
with trio.move_on_after(timeout) as cs:
yield timeout
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print(
f"i'm a user on the PARENT side and thingz hangin "
f'after timeout={timeout} ???\n\n'
'MASHING CTlR-C..!?\n'
)
raise KeyboardInterrupt
async def main(
@ -169,9 +179,6 @@ async def main(
) -> None:
# from tractor._state import _runtime_vars as rtv
# rtv['_debug_mode'] = debug_mode
async with (
open_nursery(
start_method=start_method,
@ -190,10 +197,11 @@ async def main(
)
async with (
stuff_hangin_ctlc(timeout=2) as timeout,
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or break_child_ipc_after,
or break_child_ipc_after
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either
@ -205,12 +213,14 @@ async def main(
# and KBI in an eg?
reraise=True,
),
portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,
pre_close=pre_close,
) as (ctx, sent),
):
rx_eoc: bool = False
ipc_break_sent: bool = False
async with ctx.open_stream() as stream:
for i in range(1000):
@ -228,6 +238,7 @@ async def main(
'#################################\n'
)
# TODO: other methods? see break func above.
# await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose()
@ -251,10 +262,12 @@ async def main(
# TODO: is this needed or no?
raise
timeout: int = 1
print(f'Entering `stream.receive()` with timeout={timeout}\n')
with trio.move_on_after(timeout) as cs:
# timeout: int = 1
# with trio.move_on_after(timeout) as cs:
async with stuff_hangin_ctlc() as timeout:
print(
f'PARENT `stream.receive()` with timeout={timeout}\n'
)
# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
@ -266,22 +279,29 @@ async def main(
f'{rx}\n'
)
except trio.EndOfChannel:
rx_eoc: bool = True
print('MsgStream got EoC for PARENT')
raise
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print(
f"YOO i'm a PARENT user anddd thingz hangin..\n"
f'after timeout={timeout}\n'
'Streaming finished and we got Eoc.\n'
'Canceling `.open_context()` in root with\n'
'CTlR-C..'
)
if rx_eoc:
assert stream.closed
try:
await stream.send(i)
pytest.fail('stream not closed?')
except (
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:
if rx_eoc:
assert send_err is stream._eoc
else:
assert send_err is stream._closed
print(
"YOO i'm mad!\n"
'The send side is dun but thingz hangin..\n'
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt

View File

@ -85,8 +85,8 @@ def test_ipc_channel_break_during_stream(
'''
if spawn_backend != 'trio':
# if debug_mode:
# pytest.skip('`debug_mode` only supported on `trio` spawner')
if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
@ -107,7 +107,10 @@ def test_ipc_channel_break_during_stream(
# AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream
):
expect_final_exc = trio.EndOfChannel
# expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
# gracefully!
expect_final_exc = KeyboardInterrupt
# NOTE when ONLY the child breaks or it breaks BEFORE the
# parent we expect the parent to get a closed resource error
@ -120,11 +123,25 @@ def test_ipc_channel_break_during_stream(
and
ipc_break['break_parent_ipc_after'] is False
):
expect_final_exc = trio.ClosedResourceError
# NOTE: we DO NOT expect this any more since
# the child side's channel will be broken silently
# and nothing on the parent side will indicate this!
# expect_final_exc = trio.ClosedResourceError
# if child calls `MsgStream.aclose()` then expect EoC.
# NOTE: child will send a 'stop' msg before it breaks
# the transport channel BUT, that will be absorbed by the
# `ctx.open_stream()` block and thus the `.open_context()`
# should hang, after which the test script simulates
# a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel
expect_final_exc = KeyboardInterrupt
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
# ^ XXX not any more ^ since eoc is always absorbed
# gracefully and NOT bubbled to the `.open_context()`
# block!
# expect_final_exc = trio.EndOfChannel
# BOTH but, CHILD breaks FIRST
elif (
@ -134,12 +151,8 @@ def test_ipc_channel_break_during_stream(
> ipc_break['break_child_ipc_after']
)
):
expect_final_exc = trio.ClosedResourceError
# child will send a 'stop' msg before it breaks
# the transport channel.
if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel
expect_final_exc = KeyboardInterrupt
# NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the
@ -160,7 +173,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after']
>
ipc_break['break_parent_ipc_after']
)
):
expect_final_exc = trio.ClosedResourceError
@ -224,6 +238,7 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
'''
async def main():
with trio.fail_after(3):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ipc_breaker',
@ -241,7 +256,10 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
print('parent waiting on context')
print('parent exited context')
print(
'parent exited context\n'
'parent raising KBI..\n'
)
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):

View File

@ -16,6 +16,11 @@ from tractor import ( # typing
Portal,
Context,
ContextCancelled,
RemoteActorError,
)
from tractor._testing import (
# tractor_test,
expect_ctxc,
)
# XXX TODO cases:
@ -156,10 +161,11 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
):
await trio.sleep_forever()
with pytest.raises(tractor.RemoteActorError) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == TypeError
rae = excinfo.value
assert rae.boxed_type == TypeError
@tractor.context
@ -739,14 +745,16 @@ def test_peer_canceller(
with pytest.raises(ContextCancelled) as excinfo:
trio.run(main)
assert excinfo.value.type == ContextCancelled
assert excinfo.value.boxed_type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller'
@tractor.context
async def basic_echo_server(
ctx: Context,
peer_name: str = 'stepbro',
peer_name: str = 'wittle_bruv',
err_after: int|None = None,
) -> None:
'''
@ -774,17 +782,31 @@ async def basic_echo_server(
# assert 0
await ipc.send(resp)
if (
err_after
and i > err_after
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
)
@tractor.context
async def serve_subactors(
ctx: Context,
peer_name: str,
debug_mode: bool,
) -> None:
async with open_nursery() as an:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
await ctx.started(peer_name)
async with ctx.open_stream() as reqs:
async for msg in reqs:
async with ctx.open_stream() as ipc:
async for msg in ipc:
peer_name: str = msg
peer: Portal = await an.start_actor(
name=peer_name,
@ -795,7 +817,7 @@ async def serve_subactors(
f'{peer_name}\n'
f'|_{peer}\n'
)
await reqs.send((
await ipc.send((
peer.chan.uid,
peer.chan.raddr,
))
@ -807,14 +829,20 @@ async def serve_subactors(
async def client_req_subactor(
ctx: Context,
peer_name: str,
debug_mode: bool,
# used to simulate a user causing an error to be raised
# directly in thread (like a KBI) to better replicate the
# case where a `modden` CLI client would hang afer requesting
# a `Context.cancel()` to `bigd`'s wks spawner.
reraise_on_cancel: str|None = None,
sub_err_after: int|None = None,
) -> None:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
# TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub
# that lives longer then ctx with this client.
@ -836,6 +864,7 @@ async def client_req_subactor(
spawner.open_context(
serve_subactors,
peer_name=peer_name,
debug_mode=debug_mode,
) as (spawner_ctx, first),
):
assert first == peer_name
@ -857,6 +886,7 @@ async def client_req_subactor(
await tell_little_bro(
actor_name=sub_uid[0],
caller='client',
err_after=sub_err_after,
)
# TODO: test different scope-layers of
@ -868,9 +898,7 @@ async def client_req_subactor(
# TODO: would be super nice to have a special injected
# cancel type here (maybe just our ctxc) but using
# some native mechanism in `trio` :p
except (
trio.Cancelled
) as err:
except trio.Cancelled as err:
_err = err
if reraise_on_cancel:
errtype = globals()['__builtins__'][reraise_on_cancel]
@ -897,7 +925,9 @@ async def client_req_subactor(
async def tell_little_bro(
actor_name: str,
caller: str = ''
caller: str = '',
err_after: int|None = None,
):
# contact target actor, do a stream dialog.
async with (
@ -906,10 +936,12 @@ async def tell_little_bro(
) as lb,
lb.open_context(
basic_echo_server,
# XXX proxy any delayed err condition
err_after=err_after,
) as (sub_ctx, first),
sub_ctx.open_stream(
basic_echo_server,
) as echo_ipc,
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
uid: tuple = actor.uid
@ -936,10 +968,15 @@ async def tell_little_bro(
'raise_client_error',
[None, 'KeyboardInterrupt'],
)
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 50],
)
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: int|None,
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -953,6 +990,16 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
'''
assert rae.boxed_type is RemoteActorError
assert rae.src_type is RuntimeError
assert 'client' in rae.relay_uid
assert peer_name in rae.src_uid
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
@ -976,14 +1023,24 @@ def test_peer_spawns_and_cancels_service_subactor(
server.open_context(
serve_subactors,
peer_name=peer_name,
debug_mode=debug_mode,
) as (spawn_ctx, first),
client.open_context(
client_req_subactor,
peer_name=peer_name,
debug_mode=debug_mode,
reraise_on_cancel=raise_client_error,
# trigger for error condition in sub
# during streaming.
sub_err_after=raise_sub_spawn_error_after,
) as (client_ctx, client_says),
):
root: Actor = current_actor()
spawner_uid: tuple = spawn_ctx.chan.uid
print(
f'Server says: {first}\n'
f'Client says: {client_says}\n'
@ -993,6 +1050,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# (grandchild of this root actor) "little_bro"
# and ensure we can also use it as an echo
# server.
sub: Portal
async with tractor.wait_for_actor(
name=peer_name,
) as sub:
@ -1004,41 +1062,116 @@ def test_peer_spawns_and_cancels_service_subactor(
f'.uid: {sub.actor.uid}\n'
f'chan.raddr: {sub.chan.raddr}\n'
)
async with expect_ctxc(
yay=raise_sub_spawn_error_after,
reraise=False,
):
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
# signal client to raise a KBI
await client_ctx.cancel()
print('root cancelled client, checking that sub-spawn is down')
if not raise_sub_spawn_error_after:
# signal client to cancel and maybe raise a KBI
await client_ctx.cancel()
print(
'-> root cancelling client,\n'
'-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n'
)
# else:
try:
res = await client_ctx.result(hide_tb=False)
# in remote (relayed inception) error
# case, we should error on the line above!
if raise_sub_spawn_error_after:
pytest.fail(
'Never rxed proxied `RemoteActorError[RuntimeError]` !?'
)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
except RemoteActorError as rae:
_err = rae
assert raise_sub_spawn_error_after
# since this is a "relayed error" via the client
# sub-actor, it is expected to be
# a `RemoteActorError` boxing another
# `RemoteActorError` otherwise known as
# an "inception" (from `trio`'s parlance)
# ((or maybe a "Matryoshka" and/or "matron"
# in our own working parlance)) which
# contains the source error from the
# little_bro: a `RuntimeError`.
#
check_inner_rte(rae)
assert rae.relay_uid == client.chan.uid
assert rae.src_uid == sub.chan.uid
assert not client_ctx.cancel_acked
assert (
client_ctx.maybe_error
is client_ctx.outcome
is rae
)
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
print('root cancelling server/client sub-actors')
# await tractor.pause()
res = await client_ctx.result(hide_tb=False)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == current_actor().uid
await spawn_ctx.cancel()
# await server.cancel_actor()
except RemoteActorError as rae:
# XXX more-or-less same as above handler
# this is just making sure the error bubbles out
# of the
_err = rae
assert raise_sub_spawn_error_after
raise
# since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext.
except ContextCancelled as ctxc:
print('caught ctxc from contexts!')
assert ctxc.canceller == current_actor().uid
_ctxc = ctxc
print(
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n'
f'{repr(ctxc)}\n'
)
if not raise_sub_spawn_error_after:
assert ctxc.canceller == root.uid
else:
assert ctxc.canceller == spawner_uid
assert ctxc is spawn_ctx.outcome
assert ctxc is spawn_ctx.maybe_error
raise
if raise_sub_spawn_error_after:
pytest.fail(
'context block(s) in PARENT never raised?!?'
)
if not raise_sub_spawn_error_after:
# assert spawn_ctx.cancel_acked
assert spawn_ctx.cancel_acked
assert client_ctx.cancel_acked
@ -1056,4 +1189,12 @@ def test_peer_spawns_and_cancels_service_subactor(
# assert spawn_ctx.cancelled_caught
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(main)

View File

@ -111,7 +111,6 @@ class RemoteActorError(Exception):
reprol_fields: list[str] = [
'src_uid',
'relay_path',
# 'relay_uid',
]
def __init__(
@ -487,14 +486,11 @@ def pack_error(
else:
tb_str = traceback.format_exc()
our_uid: tuple = current_actor().uid
error_msg: dict[
error_msg: dict[ # for IPC
str,
str | tuple[str, str]
] = {
'tb_str': tb_str,
'relay_uid': our_uid,
}
] = {}
our_uid: tuple = current_actor().uid
if (
isinstance(exc, RemoteActorError)
@ -535,6 +531,11 @@ def pack_error(
[],
).append(our_uid)
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's `.msgdata`).
error_msg['tb_str'] = tb_str
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid

View File

@ -715,10 +715,12 @@ class Actor:
f'|_{chan}\n'
)
try:
# send a msg loop terminate sentinel
# send msg loop terminate sentinel which
# triggers cancellation of all remotely
# started tasks.
await chan.send(None)
# XXX: do we want this?
# XXX: do we want this? no right?
# causes "[104] connection reset by peer" on other end
# await chan.aclose()
@ -1208,10 +1210,10 @@ class Actor:
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n'
'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n'
f'=>{parent_chan}\n'
f' |_ctx-id: {cid}\n'
f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n'
)
return True
@ -1510,7 +1512,6 @@ async def async_main(
):
accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in