forked from goodboy/tractor
Extend inter-peer cancel tests for "inceptions"
Use new `RemoteActorError` fields in various assertions particularly ensuring that an RTE relayed through the spawner from the little_bro shows up at the client with the right number of entries in the `.relay_path` and that the error is raised in the client as desired in the original use case from `modden`'s remote spawn spawn request API (which was kinda the whole original motivation to finally get all this multi-actor error relay stuff workin). Case extensions: - RTE relayed from little_bro through spawner to client when `raise_sub_spawn_error_after` is set; in this case test should raise the relayed and RAE boxed RTE right up to the `trio.run()`. -> ensure the `rae.src_uid`, `.relay_uid` are set correctly. -> ensure ctx cancels are no acked. - use `expect_ctxc()` around root's `tell_little_bro()` usage. - do `debug_mode` assertions when enabled by test harness in each actor layer. - obvi use new `.src_type`/`.boxed_type` for final error propagation assertions.mv_to_new_trio_py3.11
parent
8ab5e08830
commit
d5e5174d97
|
@ -16,6 +16,11 @@ from tractor import ( # typing
|
||||||
Portal,
|
Portal,
|
||||||
Context,
|
Context,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
RemoteActorError,
|
||||||
|
)
|
||||||
|
from tractor._testing import (
|
||||||
|
# tractor_test,
|
||||||
|
expect_ctxc,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX TODO cases:
|
# XXX TODO cases:
|
||||||
|
@ -156,10 +161,11 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
||||||
):
|
):
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
assert excinfo.value.type == TypeError
|
rae = excinfo.value
|
||||||
|
assert rae.boxed_type == TypeError
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -739,14 +745,16 @@ def test_peer_canceller(
|
||||||
with pytest.raises(ContextCancelled) as excinfo:
|
with pytest.raises(ContextCancelled) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
assert excinfo.value.type == ContextCancelled
|
assert excinfo.value.boxed_type == ContextCancelled
|
||||||
assert excinfo.value.canceller[0] == 'canceller'
|
assert excinfo.value.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def basic_echo_server(
|
async def basic_echo_server(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str = 'stepbro',
|
peer_name: str = 'wittle_bruv',
|
||||||
|
|
||||||
|
err_after: int|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -774,17 +782,31 @@ async def basic_echo_server(
|
||||||
# assert 0
|
# assert 0
|
||||||
await ipc.send(resp)
|
await ipc.send(resp)
|
||||||
|
|
||||||
|
if (
|
||||||
|
err_after
|
||||||
|
and i > err_after
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
f'Simulated error in `{peer_name}`'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def serve_subactors(
|
async def serve_subactors(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str,
|
peer_name: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async with open_nursery() as an:
|
async with open_nursery() as an:
|
||||||
|
|
||||||
|
# sanity
|
||||||
|
if debug_mode:
|
||||||
|
assert tractor._state.debug_mode()
|
||||||
|
|
||||||
await ctx.started(peer_name)
|
await ctx.started(peer_name)
|
||||||
async with ctx.open_stream() as reqs:
|
async with ctx.open_stream() as ipc:
|
||||||
async for msg in reqs:
|
async for msg in ipc:
|
||||||
peer_name: str = msg
|
peer_name: str = msg
|
||||||
peer: Portal = await an.start_actor(
|
peer: Portal = await an.start_actor(
|
||||||
name=peer_name,
|
name=peer_name,
|
||||||
|
@ -795,7 +817,7 @@ async def serve_subactors(
|
||||||
f'{peer_name}\n'
|
f'{peer_name}\n'
|
||||||
f'|_{peer}\n'
|
f'|_{peer}\n'
|
||||||
)
|
)
|
||||||
await reqs.send((
|
await ipc.send((
|
||||||
peer.chan.uid,
|
peer.chan.uid,
|
||||||
peer.chan.raddr,
|
peer.chan.raddr,
|
||||||
))
|
))
|
||||||
|
@ -807,14 +829,20 @@ async def serve_subactors(
|
||||||
async def client_req_subactor(
|
async def client_req_subactor(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str,
|
peer_name: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
|
||||||
# used to simulate a user causing an error to be raised
|
# used to simulate a user causing an error to be raised
|
||||||
# directly in thread (like a KBI) to better replicate the
|
# directly in thread (like a KBI) to better replicate the
|
||||||
# case where a `modden` CLI client would hang afer requesting
|
# case where a `modden` CLI client would hang afer requesting
|
||||||
# a `Context.cancel()` to `bigd`'s wks spawner.
|
# a `Context.cancel()` to `bigd`'s wks spawner.
|
||||||
reraise_on_cancel: str|None = None,
|
reraise_on_cancel: str|None = None,
|
||||||
|
sub_err_after: int|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
# sanity
|
||||||
|
if debug_mode:
|
||||||
|
assert tractor._state.debug_mode()
|
||||||
|
|
||||||
# TODO: other cases to do with sub lifetimes:
|
# TODO: other cases to do with sub lifetimes:
|
||||||
# -[ ] test that we can have the server spawn a sub
|
# -[ ] test that we can have the server spawn a sub
|
||||||
# that lives longer then ctx with this client.
|
# that lives longer then ctx with this client.
|
||||||
|
@ -836,6 +864,7 @@ async def client_req_subactor(
|
||||||
spawner.open_context(
|
spawner.open_context(
|
||||||
serve_subactors,
|
serve_subactors,
|
||||||
peer_name=peer_name,
|
peer_name=peer_name,
|
||||||
|
debug_mode=debug_mode,
|
||||||
) as (spawner_ctx, first),
|
) as (spawner_ctx, first),
|
||||||
):
|
):
|
||||||
assert first == peer_name
|
assert first == peer_name
|
||||||
|
@ -857,6 +886,7 @@ async def client_req_subactor(
|
||||||
await tell_little_bro(
|
await tell_little_bro(
|
||||||
actor_name=sub_uid[0],
|
actor_name=sub_uid[0],
|
||||||
caller='client',
|
caller='client',
|
||||||
|
err_after=sub_err_after,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: test different scope-layers of
|
# TODO: test different scope-layers of
|
||||||
|
@ -868,9 +898,7 @@ async def client_req_subactor(
|
||||||
# TODO: would be super nice to have a special injected
|
# TODO: would be super nice to have a special injected
|
||||||
# cancel type here (maybe just our ctxc) but using
|
# cancel type here (maybe just our ctxc) but using
|
||||||
# some native mechanism in `trio` :p
|
# some native mechanism in `trio` :p
|
||||||
except (
|
except trio.Cancelled as err:
|
||||||
trio.Cancelled
|
|
||||||
) as err:
|
|
||||||
_err = err
|
_err = err
|
||||||
if reraise_on_cancel:
|
if reraise_on_cancel:
|
||||||
errtype = globals()['__builtins__'][reraise_on_cancel]
|
errtype = globals()['__builtins__'][reraise_on_cancel]
|
||||||
|
@ -897,7 +925,9 @@ async def client_req_subactor(
|
||||||
|
|
||||||
async def tell_little_bro(
|
async def tell_little_bro(
|
||||||
actor_name: str,
|
actor_name: str,
|
||||||
caller: str = ''
|
|
||||||
|
caller: str = '',
|
||||||
|
err_after: int|None = None,
|
||||||
):
|
):
|
||||||
# contact target actor, do a stream dialog.
|
# contact target actor, do a stream dialog.
|
||||||
async with (
|
async with (
|
||||||
|
@ -906,10 +936,12 @@ async def tell_little_bro(
|
||||||
) as lb,
|
) as lb,
|
||||||
lb.open_context(
|
lb.open_context(
|
||||||
basic_echo_server,
|
basic_echo_server,
|
||||||
|
|
||||||
|
# XXX proxy any delayed err condition
|
||||||
|
err_after=err_after,
|
||||||
) as (sub_ctx, first),
|
) as (sub_ctx, first),
|
||||||
sub_ctx.open_stream(
|
|
||||||
basic_echo_server,
|
sub_ctx.open_stream() as echo_ipc,
|
||||||
) as echo_ipc,
|
|
||||||
):
|
):
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
uid: tuple = actor.uid
|
uid: tuple = actor.uid
|
||||||
|
@ -936,10 +968,15 @@ async def tell_little_bro(
|
||||||
'raise_client_error',
|
'raise_client_error',
|
||||||
[None, 'KeyboardInterrupt'],
|
[None, 'KeyboardInterrupt'],
|
||||||
)
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'raise_sub_spawn_error_after',
|
||||||
|
[None, 50],
|
||||||
|
)
|
||||||
def test_peer_spawns_and_cancels_service_subactor(
|
def test_peer_spawns_and_cancels_service_subactor(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
raise_sub_spawn_error_after: int|None,
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
@ -953,6 +990,16 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# and the server's spawned child should cancel and terminate!
|
# and the server's spawned child should cancel and terminate!
|
||||||
peer_name: str = 'little_bro'
|
peer_name: str = 'little_bro'
|
||||||
|
|
||||||
|
def check_inner_rte(rae: RemoteActorError):
|
||||||
|
'''
|
||||||
|
Validate the little_bro's relayed inception!
|
||||||
|
|
||||||
|
'''
|
||||||
|
assert rae.boxed_type is RemoteActorError
|
||||||
|
assert rae.src_type is RuntimeError
|
||||||
|
assert 'client' in rae.relay_uid
|
||||||
|
assert peer_name in rae.src_uid
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||||
|
@ -976,14 +1023,24 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
server.open_context(
|
server.open_context(
|
||||||
serve_subactors,
|
serve_subactors,
|
||||||
peer_name=peer_name,
|
peer_name=peer_name,
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
|
||||||
) as (spawn_ctx, first),
|
) as (spawn_ctx, first),
|
||||||
|
|
||||||
client.open_context(
|
client.open_context(
|
||||||
client_req_subactor,
|
client_req_subactor,
|
||||||
peer_name=peer_name,
|
peer_name=peer_name,
|
||||||
|
debug_mode=debug_mode,
|
||||||
reraise_on_cancel=raise_client_error,
|
reraise_on_cancel=raise_client_error,
|
||||||
|
|
||||||
|
# trigger for error condition in sub
|
||||||
|
# during streaming.
|
||||||
|
sub_err_after=raise_sub_spawn_error_after,
|
||||||
|
|
||||||
) as (client_ctx, client_says),
|
) as (client_ctx, client_says),
|
||||||
):
|
):
|
||||||
|
root: Actor = current_actor()
|
||||||
|
spawner_uid: tuple = spawn_ctx.chan.uid
|
||||||
print(
|
print(
|
||||||
f'Server says: {first}\n'
|
f'Server says: {first}\n'
|
||||||
f'Client says: {client_says}\n'
|
f'Client says: {client_says}\n'
|
||||||
|
@ -993,6 +1050,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# (grandchild of this root actor) "little_bro"
|
# (grandchild of this root actor) "little_bro"
|
||||||
# and ensure we can also use it as an echo
|
# and ensure we can also use it as an echo
|
||||||
# server.
|
# server.
|
||||||
|
sub: Portal
|
||||||
async with tractor.wait_for_actor(
|
async with tractor.wait_for_actor(
|
||||||
name=peer_name,
|
name=peer_name,
|
||||||
) as sub:
|
) as sub:
|
||||||
|
@ -1004,56 +1062,139 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
f'.uid: {sub.actor.uid}\n'
|
f'.uid: {sub.actor.uid}\n'
|
||||||
f'chan.raddr: {sub.chan.raddr}\n'
|
f'chan.raddr: {sub.chan.raddr}\n'
|
||||||
)
|
)
|
||||||
await tell_little_bro(
|
|
||||||
actor_name=peer_name,
|
|
||||||
caller='root',
|
|
||||||
)
|
|
||||||
|
|
||||||
# signal client to raise a KBI
|
async with expect_ctxc(
|
||||||
await client_ctx.cancel()
|
yay=raise_sub_spawn_error_after,
|
||||||
print('root cancelled client, checking that sub-spawn is down')
|
reraise=False,
|
||||||
|
):
|
||||||
|
await tell_little_bro(
|
||||||
|
actor_name=peer_name,
|
||||||
|
caller='root',
|
||||||
|
)
|
||||||
|
|
||||||
async with tractor.find_actor(
|
if not raise_sub_spawn_error_after:
|
||||||
name=peer_name,
|
|
||||||
) as sub:
|
|
||||||
assert not sub
|
|
||||||
|
|
||||||
print('root cancelling server/client sub-actors')
|
# signal client to cancel and maybe raise a KBI
|
||||||
|
await client_ctx.cancel()
|
||||||
|
print(
|
||||||
|
'-> root cancelling client,\n'
|
||||||
|
'-> root checking `client_ctx.result()`,\n'
|
||||||
|
f'-> checking that sub-spawn {peer_name} is down\n'
|
||||||
|
)
|
||||||
|
# else:
|
||||||
|
|
||||||
# await tractor.pause()
|
try:
|
||||||
res = await client_ctx.result(hide_tb=False)
|
res = await client_ctx.result(hide_tb=False)
|
||||||
assert isinstance(res, ContextCancelled)
|
|
||||||
assert client_ctx.cancel_acked
|
# in remote (relayed inception) error
|
||||||
assert res.canceller == current_actor().uid
|
# case, we should error on the line above!
|
||||||
|
if raise_sub_spawn_error_after:
|
||||||
|
pytest.fail(
|
||||||
|
'Never rxed proxied `RemoteActorError[RuntimeError]` !?'
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(res, ContextCancelled)
|
||||||
|
assert client_ctx.cancel_acked
|
||||||
|
assert res.canceller == root.uid
|
||||||
|
|
||||||
|
except RemoteActorError as rae:
|
||||||
|
_err = rae
|
||||||
|
assert raise_sub_spawn_error_after
|
||||||
|
|
||||||
|
# since this is a "relayed error" via the client
|
||||||
|
# sub-actor, it is expected to be
|
||||||
|
# a `RemoteActorError` boxing another
|
||||||
|
# `RemoteActorError` otherwise known as
|
||||||
|
# an "inception" (from `trio`'s parlance)
|
||||||
|
# ((or maybe a "Matryoshka" and/or "matron"
|
||||||
|
# in our own working parlance)) which
|
||||||
|
# contains the source error from the
|
||||||
|
# little_bro: a `RuntimeError`.
|
||||||
|
#
|
||||||
|
check_inner_rte(rae)
|
||||||
|
assert rae.relay_uid == client.chan.uid
|
||||||
|
assert rae.src_uid == sub.chan.uid
|
||||||
|
|
||||||
|
assert not client_ctx.cancel_acked
|
||||||
|
assert (
|
||||||
|
client_ctx.maybe_error
|
||||||
|
is client_ctx.outcome
|
||||||
|
is rae
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
# await tractor.pause()
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert not raise_sub_spawn_error_after
|
||||||
|
|
||||||
|
# cancelling the spawner sub should
|
||||||
|
# transitively cancel it's sub, the little
|
||||||
|
# bruv.
|
||||||
|
print('root cancelling server/client sub-actors')
|
||||||
|
await spawn_ctx.cancel()
|
||||||
|
async with tractor.find_actor(
|
||||||
|
name=peer_name,
|
||||||
|
) as sub:
|
||||||
|
assert not sub
|
||||||
|
|
||||||
await spawn_ctx.cancel()
|
|
||||||
# await server.cancel_actor()
|
# await server.cancel_actor()
|
||||||
|
|
||||||
|
except RemoteActorError as rae:
|
||||||
|
# XXX more-or-less same as above handler
|
||||||
|
# this is just making sure the error bubbles out
|
||||||
|
# of the
|
||||||
|
_err = rae
|
||||||
|
assert raise_sub_spawn_error_after
|
||||||
|
raise
|
||||||
|
|
||||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||||
# called directly fot this confext.
|
# called directly fot this confext.
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
print('caught ctxc from contexts!')
|
_ctxc = ctxc
|
||||||
assert ctxc.canceller == current_actor().uid
|
print(
|
||||||
|
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n'
|
||||||
|
f'{repr(ctxc)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if not raise_sub_spawn_error_after:
|
||||||
|
assert ctxc.canceller == root.uid
|
||||||
|
else:
|
||||||
|
assert ctxc.canceller == spawner_uid
|
||||||
|
|
||||||
assert ctxc is spawn_ctx.outcome
|
assert ctxc is spawn_ctx.outcome
|
||||||
assert ctxc is spawn_ctx.maybe_error
|
assert ctxc is spawn_ctx.maybe_error
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# assert spawn_ctx.cancel_acked
|
if raise_sub_spawn_error_after:
|
||||||
assert spawn_ctx.cancel_acked
|
pytest.fail(
|
||||||
assert client_ctx.cancel_acked
|
'context block(s) in PARENT never raised?!?'
|
||||||
|
)
|
||||||
|
|
||||||
await client.cancel_actor()
|
if not raise_sub_spawn_error_after:
|
||||||
await server.cancel_actor()
|
# assert spawn_ctx.cancel_acked
|
||||||
|
assert spawn_ctx.cancel_acked
|
||||||
|
assert client_ctx.cancel_acked
|
||||||
|
|
||||||
# WOA WOA WOA! we need this to close..!!!??
|
await client.cancel_actor()
|
||||||
# that's super bad XD
|
await server.cancel_actor()
|
||||||
|
|
||||||
# TODO: why isn't this working!?!?
|
# WOA WOA WOA! we need this to close..!!!??
|
||||||
# we're now outside the `.open_context()` block so
|
# that's super bad XD
|
||||||
# the internal `Context._scope: CancelScope` should be
|
|
||||||
# gracefully "closed" ;)
|
|
||||||
|
|
||||||
# assert spawn_ctx.cancelled_caught
|
# TODO: why isn't this working!?!?
|
||||||
|
# we're now outside the `.open_context()` block so
|
||||||
|
# the internal `Context._scope: CancelScope` should be
|
||||||
|
# gracefully "closed" ;)
|
||||||
|
|
||||||
trio.run(main)
|
# assert spawn_ctx.cancelled_caught
|
||||||
|
|
||||||
|
if raise_sub_spawn_error_after:
|
||||||
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
rae: RemoteActorError = excinfo.value
|
||||||
|
check_inner_rte(rae)
|
||||||
|
|
||||||
|
else:
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue