Compare commits

..

No commits in common. "8e66f45e2386efacb344329a05c341343f9c0659" and "668016d37b89d3e140d45e11914d606bc371f4f0" have entirely different histories.

5 changed files with 138 additions and 319 deletions

View File

@ -6,7 +6,6 @@ been an outage) and we want to ensure that despite being in debug mode
actor tree will eventually be cancelled without leaving any zombies.
'''
from contextlib import asynccontextmanager as acm
from functools import partial
from tractor import (
@ -18,7 +17,6 @@ from tractor import (
_testing,
)
import trio
import pytest
async def break_ipc(
@ -43,13 +41,6 @@ async def break_ipc(
await stream.aclose()
method: str = method or def_method
print(
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
f'method: {method}\n'
f'pre `.aclose()`: {pre_close}\n'
'#################################\n'
)
match method:
case 'trans_aclose':
@ -89,17 +80,17 @@ async def break_ipc_then_error(
break_ipc_with: str|None = None,
pre_close: bool = False,
):
await break_ipc(
stream=stream,
method=break_ipc_with,
pre_close=pre_close,
)
async for msg in stream:
await stream.send(msg)
assert 0
await break_ipc(
stream=stream,
method=break_ipc_with,
pre_close=pre_close,
)
assert 0
# async def close_stream_and_error(
async def iter_ipc_stream(
stream: MsgStream,
break_ipc_with: str|None = None,
@ -108,6 +99,20 @@ async def iter_ipc_stream(
async for msg in stream:
await stream.send(msg)
# wipe out channel right before raising
# await break_ipc(
# stream=stream,
# method=break_ipc_with,
# pre_close=pre_close,
# )
# send channel close msg at SC-prot level
#
# TODO: what should get raised here if anything?
# await stream.aclose()
# assert 0
@context
async def recv_and_spawn_net_killers(
@ -129,16 +134,14 @@ async def recv_and_spawn_net_killers(
async for i in stream:
print(f'child echoing {i}')
await stream.send(i)
if (
break_ipc_after
and
i >= break_ipc_after
i > break_ipc_after
):
n.start_soon(
iter_ipc_stream,
stream,
)
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
'#################################\n'
n.start_soon(
partial(
break_ipc_then_error,
@ -146,23 +149,10 @@ async def recv_and_spawn_net_killers(
pre_close=pre_close,
)
)
@acm
async def stuff_hangin_ctlc(timeout: float = 1) -> None:
with trio.move_on_after(timeout) as cs:
yield timeout
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print(
f"i'm a user on the PARENT side and thingz hangin "
f'after timeout={timeout} ???\n\n'
'MASHING CTlR-C..!?\n'
)
raise KeyboardInterrupt
n.start_soon(
iter_ipc_stream,
stream,
)
async def main(
@ -179,6 +169,9 @@ async def main(
) -> None:
# from tractor._state import _runtime_vars as rtv
# rtv['_debug_mode'] = debug_mode
async with (
open_nursery(
start_method=start_method,
@ -197,11 +190,10 @@ async def main(
)
async with (
stuff_hangin_ctlc(timeout=2) as timeout,
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or break_child_ipc_after
or break_child_ipc_after,
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either
@ -213,14 +205,12 @@ async def main(
# and KBI in an eg?
reraise=True,
),
portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,
pre_close=pre_close,
) as (ctx, sent),
):
rx_eoc: bool = False
ipc_break_sent: bool = False
async with ctx.open_stream() as stream:
for i in range(1000):
@ -238,7 +228,6 @@ async def main(
'#################################\n'
)
# TODO: other methods? see break func above.
# await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose()
@ -262,12 +251,10 @@ async def main(
# TODO: is this needed or no?
raise
# timeout: int = 1
# with trio.move_on_after(timeout) as cs:
async with stuff_hangin_ctlc() as timeout:
print(
f'PARENT `stream.receive()` with timeout={timeout}\n'
)
timeout: int = 1
print(f'Entering `stream.receive()` with timeout={timeout}\n')
with trio.move_on_after(timeout) as cs:
# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
@ -279,30 +266,23 @@ async def main(
f'{rx}\n'
)
except trio.EndOfChannel:
rx_eoc: bool = True
print('MsgStream got EoC for PARENT')
raise
print(
'Streaming finished and we got Eoc.\n'
'Canceling `.open_context()` in root with\n'
'CTlR-C..'
)
if rx_eoc:
assert stream.closed
try:
await stream.send(i)
pytest.fail('stream not closed?')
except (
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:
if rx_eoc:
assert send_err is stream._eoc
else:
assert send_err is stream._closed
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print(
f"YOO i'm a PARENT user anddd thingz hangin..\n"
f'after timeout={timeout}\n'
)
raise KeyboardInterrupt
print(
"YOO i'm mad!\n"
'The send side is dun but thingz hangin..\n'
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt
if __name__ == '__main__':

View File

@ -85,8 +85,8 @@ def test_ipc_channel_break_during_stream(
'''
if spawn_backend != 'trio':
if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner')
# if debug_mode:
# pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
@ -107,10 +107,7 @@ def test_ipc_channel_break_during_stream(
# AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream
):
# expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
# gracefully!
expect_final_exc = KeyboardInterrupt
expect_final_exc = trio.EndOfChannel
# NOTE when ONLY the child breaks or it breaks BEFORE the
# parent we expect the parent to get a closed resource error
@ -123,25 +120,11 @@ def test_ipc_channel_break_during_stream(
and
ipc_break['break_parent_ipc_after'] is False
):
# NOTE: we DO NOT expect this any more since
# the child side's channel will be broken silently
# and nothing on the parent side will indicate this!
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = trio.ClosedResourceError
# NOTE: child will send a 'stop' msg before it breaks
# the transport channel BUT, that will be absorbed by the
# `ctx.open_stream()` block and thus the `.open_context()`
# should hang, after which the test script simulates
# a user sending ctl-c by raising a KBI.
# if child calls `MsgStream.aclose()` then expect EoC.
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
# ^ XXX not any more ^ since eoc is always absorbed
# gracefully and NOT bubbled to the `.open_context()`
# block!
# expect_final_exc = trio.EndOfChannel
expect_final_exc = trio.EndOfChannel
# BOTH but, CHILD breaks FIRST
elif (
@ -151,8 +134,12 @@ def test_ipc_channel_break_during_stream(
> ipc_break['break_child_ipc_after']
)
):
expect_final_exc = trio.ClosedResourceError
# child will send a 'stop' msg before it breaks
# the transport channel.
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
expect_final_exc = trio.EndOfChannel
# NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the
@ -173,8 +160,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
>
ipc_break['break_parent_ipc_after']
> ipc_break['break_parent_ipc_after']
)
):
expect_final_exc = trio.ClosedResourceError
@ -238,29 +224,25 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
'''
async def main():
with trio.fail_after(3):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ipc_breaker',
enable_modules=[__name__],
)
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ipc_breaker',
enable_modules=[__name__],
)
with trio.move_on_after(1):
async with (
portal.open_context(
break_ipc_after_started
) as (ctx, sent),
):
async with ctx.open_stream():
await trio.sleep(0.5)
with trio.move_on_after(1):
async with (
portal.open_context(
break_ipc_after_started
) as (ctx, sent),
):
async with ctx.open_stream():
await trio.sleep(0.5)
print('parent waiting on context')
print('parent waiting on context')
print(
'parent exited context\n'
'parent raising KBI..\n'
)
raise KeyboardInterrupt
print('parent exited context')
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -16,11 +16,6 @@ from tractor import ( # typing
Portal,
Context,
ContextCancelled,
RemoteActorError,
)
from tractor._testing import (
# tractor_test,
expect_ctxc,
)
# XXX TODO cases:
@ -161,11 +156,10 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
):
await trio.sleep_forever()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
rae = excinfo.value
assert rae.boxed_type == TypeError
assert excinfo.value.type == TypeError
@tractor.context
@ -745,16 +739,14 @@ def test_peer_canceller(
with pytest.raises(ContextCancelled) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type == ContextCancelled
assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller'
@tractor.context
async def basic_echo_server(
ctx: Context,
peer_name: str = 'wittle_bruv',
err_after: int|None = None,
peer_name: str = 'stepbro',
) -> None:
'''
@ -782,31 +774,17 @@ async def basic_echo_server(
# assert 0
await ipc.send(resp)
if (
err_after
and i > err_after
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
)
@tractor.context
async def serve_subactors(
ctx: Context,
peer_name: str,
debug_mode: bool,
) -> None:
async with open_nursery() as an:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
await ctx.started(peer_name)
async with ctx.open_stream() as ipc:
async for msg in ipc:
async with ctx.open_stream() as reqs:
async for msg in reqs:
peer_name: str = msg
peer: Portal = await an.start_actor(
name=peer_name,
@ -817,7 +795,7 @@ async def serve_subactors(
f'{peer_name}\n'
f'|_{peer}\n'
)
await ipc.send((
await reqs.send((
peer.chan.uid,
peer.chan.raddr,
))
@ -829,20 +807,14 @@ async def serve_subactors(
async def client_req_subactor(
ctx: Context,
peer_name: str,
debug_mode: bool,
# used to simulate a user causing an error to be raised
# directly in thread (like a KBI) to better replicate the
# case where a `modden` CLI client would hang afer requesting
# a `Context.cancel()` to `bigd`'s wks spawner.
reraise_on_cancel: str|None = None,
sub_err_after: int|None = None,
) -> None:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
# TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub
# that lives longer then ctx with this client.
@ -864,7 +836,6 @@ async def client_req_subactor(
spawner.open_context(
serve_subactors,
peer_name=peer_name,
debug_mode=debug_mode,
) as (spawner_ctx, first),
):
assert first == peer_name
@ -886,7 +857,6 @@ async def client_req_subactor(
await tell_little_bro(
actor_name=sub_uid[0],
caller='client',
err_after=sub_err_after,
)
# TODO: test different scope-layers of
@ -898,7 +868,9 @@ async def client_req_subactor(
# TODO: would be super nice to have a special injected
# cancel type here (maybe just our ctxc) but using
# some native mechanism in `trio` :p
except trio.Cancelled as err:
except (
trio.Cancelled
) as err:
_err = err
if reraise_on_cancel:
errtype = globals()['__builtins__'][reraise_on_cancel]
@ -925,9 +897,7 @@ async def client_req_subactor(
async def tell_little_bro(
actor_name: str,
caller: str = '',
err_after: int|None = None,
caller: str = ''
):
# contact target actor, do a stream dialog.
async with (
@ -936,12 +906,10 @@ async def tell_little_bro(
) as lb,
lb.open_context(
basic_echo_server,
# XXX proxy any delayed err condition
err_after=err_after,
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
sub_ctx.open_stream(
basic_echo_server,
) as echo_ipc,
):
actor: Actor = current_actor()
uid: tuple = actor.uid
@ -968,15 +936,10 @@ async def tell_little_bro(
'raise_client_error',
[None, 'KeyboardInterrupt'],
)
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 50],
)
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
reg_addr: tuple[str, int],
raise_sub_spawn_error_after: int|None,
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
@ -990,16 +953,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
'''
assert rae.boxed_type is RemoteActorError
assert rae.src_type is RuntimeError
assert 'client' in rae.relay_uid
assert peer_name in rae.src_uid
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
@ -1023,24 +976,14 @@ def test_peer_spawns_and_cancels_service_subactor(
server.open_context(
serve_subactors,
peer_name=peer_name,
debug_mode=debug_mode,
) as (spawn_ctx, first),
client.open_context(
client_req_subactor,
peer_name=peer_name,
debug_mode=debug_mode,
reraise_on_cancel=raise_client_error,
# trigger for error condition in sub
# during streaming.
sub_err_after=raise_sub_spawn_error_after,
) as (client_ctx, client_says),
):
root: Actor = current_actor()
spawner_uid: tuple = spawn_ctx.chan.uid
print(
f'Server says: {first}\n'
f'Client says: {client_says}\n'
@ -1050,7 +993,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# (grandchild of this root actor) "little_bro"
# and ensure we can also use it as an echo
# server.
sub: Portal
async with tractor.wait_for_actor(
name=peer_name,
) as sub:
@ -1062,139 +1004,56 @@ def test_peer_spawns_and_cancels_service_subactor(
f'.uid: {sub.actor.uid}\n'
f'chan.raddr: {sub.chan.raddr}\n'
)
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
async with expect_ctxc(
yay=raise_sub_spawn_error_after,
reraise=False,
):
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
# signal client to raise a KBI
await client_ctx.cancel()
print('root cancelled client, checking that sub-spawn is down')
if not raise_sub_spawn_error_after:
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# signal client to cancel and maybe raise a KBI
await client_ctx.cancel()
print(
'-> root cancelling client,\n'
'-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n'
)
# else:
print('root cancelling server/client sub-actors')
try:
res = await client_ctx.result(hide_tb=False)
# in remote (relayed inception) error
# case, we should error on the line above!
if raise_sub_spawn_error_after:
pytest.fail(
'Never rxed proxied `RemoteActorError[RuntimeError]` !?'
)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
except RemoteActorError as rae:
_err = rae
assert raise_sub_spawn_error_after
# since this is a "relayed error" via the client
# sub-actor, it is expected to be
# a `RemoteActorError` boxing another
# `RemoteActorError` otherwise known as
# an "inception" (from `trio`'s parlance)
# ((or maybe a "Matryoshka" and/or "matron"
# in our own working parlance)) which
# contains the source error from the
# little_bro: a `RuntimeError`.
#
check_inner_rte(rae)
assert rae.relay_uid == client.chan.uid
assert rae.src_uid == sub.chan.uid
assert not client_ctx.cancel_acked
assert (
client_ctx.maybe_error
is client_ctx.outcome
is rae
)
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# await tractor.pause()
res = await client_ctx.result(hide_tb=False)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == current_actor().uid
await spawn_ctx.cancel()
# await server.cancel_actor()
except RemoteActorError as rae:
# XXX more-or-less same as above handler
# this is just making sure the error bubbles out
# of the
_err = rae
assert raise_sub_spawn_error_after
raise
# since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext.
except ContextCancelled as ctxc:
_ctxc = ctxc
print(
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n'
f'{repr(ctxc)}\n'
)
if not raise_sub_spawn_error_after:
assert ctxc.canceller == root.uid
else:
assert ctxc.canceller == spawner_uid
print('caught ctxc from contexts!')
assert ctxc.canceller == current_actor().uid
assert ctxc is spawn_ctx.outcome
assert ctxc is spawn_ctx.maybe_error
raise
if raise_sub_spawn_error_after:
pytest.fail(
'context block(s) in PARENT never raised?!?'
)
# assert spawn_ctx.cancel_acked
assert spawn_ctx.cancel_acked
assert client_ctx.cancel_acked
if not raise_sub_spawn_error_after:
# assert spawn_ctx.cancel_acked
assert spawn_ctx.cancel_acked
assert client_ctx.cancel_acked
await client.cancel_actor()
await server.cancel_actor()
await client.cancel_actor()
await server.cancel_actor()
# WOA WOA WOA! we need this to close..!!!??
# that's super bad XD
# WOA WOA WOA! we need this to close..!!!??
# that's super bad XD
# TODO: why isn't this working!?!?
# we're now outside the `.open_context()` block so
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
# TODO: why isn't this working!?!?
# we're now outside the `.open_context()` block so
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
# assert spawn_ctx.cancelled_caught
# assert spawn_ctx.cancelled_caught
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(main)
trio.run(main)

View File

@ -111,6 +111,7 @@ class RemoteActorError(Exception):
reprol_fields: list[str] = [
'src_uid',
'relay_path',
# 'relay_uid',
]
def __init__(
@ -486,11 +487,14 @@ def pack_error(
else:
tb_str = traceback.format_exc()
error_msg: dict[ # for IPC
our_uid: tuple = current_actor().uid
error_msg: dict[
str,
str | tuple[str, str]
] = {}
our_uid: tuple = current_actor().uid
] = {
'tb_str': tb_str,
'relay_uid': our_uid,
}
if (
isinstance(exc, RemoteActorError)
@ -531,11 +535,6 @@ def pack_error(
[],
).append(our_uid)
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's `.msgdata`).
error_msg['tb_str'] = tb_str
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid

View File

@ -715,12 +715,10 @@ class Actor:
f'|_{chan}\n'
)
try:
# send msg loop terminate sentinel which
# triggers cancellation of all remotely
# started tasks.
# send a msg loop terminate sentinel
await chan.send(None)
# XXX: do we want this? no right?
# XXX: do we want this?
# causes "[104] connection reset by peer" on other end
# await chan.aclose()
@ -1210,10 +1208,10 @@ class Actor:
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n\n'
'Cancel request invalid, RPC task already completed?\n'
f'<= canceller: {requesting_uid}\n\n'
f'=> {cid}@{parent_chan.uid}\n'
f' |_{parent_chan}\n'
f'=>{parent_chan}\n'
f' |_ctx-id: {cid}\n'
)
return True
@ -1512,6 +1510,7 @@ async def async_main(
):
accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in