Compare commits
No commits in common. "8e66f45e2386efacb344329a05c341343f9c0659" and "668016d37b89d3e140d45e11914d606bc371f4f0" have entirely different histories.
8e66f45e23
...
668016d37b
|
@ -6,7 +6,6 @@ been an outage) and we want to ensure that despite being in debug mode
|
||||||
actor tree will eventually be cancelled without leaving any zombies.
|
actor tree will eventually be cancelled without leaving any zombies.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
@ -18,7 +17,6 @@ from tractor import (
|
||||||
_testing,
|
_testing,
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
async def break_ipc(
|
async def break_ipc(
|
||||||
|
@ -43,13 +41,6 @@ async def break_ipc(
|
||||||
await stream.aclose()
|
await stream.aclose()
|
||||||
|
|
||||||
method: str = method or def_method
|
method: str = method or def_method
|
||||||
print(
|
|
||||||
'#################################\n'
|
|
||||||
'Simulating CHILD-side IPC BREAK!\n'
|
|
||||||
f'method: {method}\n'
|
|
||||||
f'pre `.aclose()`: {pre_close}\n'
|
|
||||||
'#################################\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
match method:
|
match method:
|
||||||
case 'trans_aclose':
|
case 'trans_aclose':
|
||||||
|
@ -89,17 +80,17 @@ async def break_ipc_then_error(
|
||||||
break_ipc_with: str|None = None,
|
break_ipc_with: str|None = None,
|
||||||
pre_close: bool = False,
|
pre_close: bool = False,
|
||||||
):
|
):
|
||||||
|
async for msg in stream:
|
||||||
|
await stream.send(msg)
|
||||||
await break_ipc(
|
await break_ipc(
|
||||||
stream=stream,
|
stream=stream,
|
||||||
method=break_ipc_with,
|
method=break_ipc_with,
|
||||||
pre_close=pre_close,
|
pre_close=pre_close,
|
||||||
)
|
)
|
||||||
async for msg in stream:
|
|
||||||
await stream.send(msg)
|
|
||||||
|
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
|
# async def close_stream_and_error(
|
||||||
async def iter_ipc_stream(
|
async def iter_ipc_stream(
|
||||||
stream: MsgStream,
|
stream: MsgStream,
|
||||||
break_ipc_with: str|None = None,
|
break_ipc_with: str|None = None,
|
||||||
|
@ -108,6 +99,20 @@ async def iter_ipc_stream(
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
await stream.send(msg)
|
await stream.send(msg)
|
||||||
|
|
||||||
|
# wipe out channel right before raising
|
||||||
|
# await break_ipc(
|
||||||
|
# stream=stream,
|
||||||
|
# method=break_ipc_with,
|
||||||
|
# pre_close=pre_close,
|
||||||
|
# )
|
||||||
|
|
||||||
|
# send channel close msg at SC-prot level
|
||||||
|
#
|
||||||
|
# TODO: what should get raised here if anything?
|
||||||
|
# await stream.aclose()
|
||||||
|
|
||||||
|
# assert 0
|
||||||
|
|
||||||
|
|
||||||
@context
|
@context
|
||||||
async def recv_and_spawn_net_killers(
|
async def recv_and_spawn_net_killers(
|
||||||
|
@ -129,16 +134,14 @@ async def recv_and_spawn_net_killers(
|
||||||
async for i in stream:
|
async for i in stream:
|
||||||
print(f'child echoing {i}')
|
print(f'child echoing {i}')
|
||||||
await stream.send(i)
|
await stream.send(i)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
break_ipc_after
|
break_ipc_after
|
||||||
and
|
and
|
||||||
i >= break_ipc_after
|
i > break_ipc_after
|
||||||
):
|
):
|
||||||
n.start_soon(
|
'#################################\n'
|
||||||
iter_ipc_stream,
|
'Simulating CHILD-side IPC BREAK!\n'
|
||||||
stream,
|
'#################################\n'
|
||||||
)
|
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
partial(
|
partial(
|
||||||
break_ipc_then_error,
|
break_ipc_then_error,
|
||||||
|
@ -146,23 +149,10 @@ async def recv_and_spawn_net_killers(
|
||||||
pre_close=pre_close,
|
pre_close=pre_close,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
n.start_soon(
|
||||||
|
iter_ipc_stream,
|
||||||
@acm
|
stream,
|
||||||
async def stuff_hangin_ctlc(timeout: float = 1) -> None:
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
|
||||||
yield timeout
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
# pretend to be a user seeing no streaming action
|
|
||||||
# thinking it's a hang, and then hitting ctl-c..
|
|
||||||
print(
|
|
||||||
f"i'm a user on the PARENT side and thingz hangin "
|
|
||||||
f'after timeout={timeout} ???\n\n'
|
|
||||||
'MASHING CTlR-C..!?\n'
|
|
||||||
)
|
)
|
||||||
raise KeyboardInterrupt
|
|
||||||
|
|
||||||
|
|
||||||
async def main(
|
async def main(
|
||||||
|
@ -179,6 +169,9 @@ async def main(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
# from tractor._state import _runtime_vars as rtv
|
||||||
|
# rtv['_debug_mode'] = debug_mode
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_nursery(
|
open_nursery(
|
||||||
start_method=start_method,
|
start_method=start_method,
|
||||||
|
@ -197,11 +190,10 @@ async def main(
|
||||||
)
|
)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
stuff_hangin_ctlc(timeout=2) as timeout,
|
|
||||||
_testing.expect_ctxc(
|
_testing.expect_ctxc(
|
||||||
yay=(
|
yay=(
|
||||||
break_parent_ipc_after
|
break_parent_ipc_after
|
||||||
or break_child_ipc_after
|
or break_child_ipc_after,
|
||||||
),
|
),
|
||||||
# TODO: we CAN'T remove this right?
|
# TODO: we CAN'T remove this right?
|
||||||
# since we need the ctxc to bubble up from either
|
# since we need the ctxc to bubble up from either
|
||||||
|
@ -213,14 +205,12 @@ async def main(
|
||||||
# and KBI in an eg?
|
# and KBI in an eg?
|
||||||
reraise=True,
|
reraise=True,
|
||||||
),
|
),
|
||||||
|
|
||||||
portal.open_context(
|
portal.open_context(
|
||||||
recv_and_spawn_net_killers,
|
recv_and_spawn_net_killers,
|
||||||
break_ipc_after=break_child_ipc_after,
|
break_ipc_after=break_child_ipc_after,
|
||||||
pre_close=pre_close,
|
pre_close=pre_close,
|
||||||
) as (ctx, sent),
|
) as (ctx, sent),
|
||||||
):
|
):
|
||||||
rx_eoc: bool = False
|
|
||||||
ipc_break_sent: bool = False
|
ipc_break_sent: bool = False
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
for i in range(1000):
|
for i in range(1000):
|
||||||
|
@ -238,7 +228,6 @@ async def main(
|
||||||
'#################################\n'
|
'#################################\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: other methods? see break func above.
|
|
||||||
# await stream._ctx.chan.send(None)
|
# await stream._ctx.chan.send(None)
|
||||||
# await stream._ctx.chan.transport.stream.send_eof()
|
# await stream._ctx.chan.transport.stream.send_eof()
|
||||||
await stream._ctx.chan.transport.stream.aclose()
|
await stream._ctx.chan.transport.stream.aclose()
|
||||||
|
@ -262,12 +251,10 @@ async def main(
|
||||||
# TODO: is this needed or no?
|
# TODO: is this needed or no?
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# timeout: int = 1
|
timeout: int = 1
|
||||||
# with trio.move_on_after(timeout) as cs:
|
print(f'Entering `stream.receive()` with timeout={timeout}\n')
|
||||||
async with stuff_hangin_ctlc() as timeout:
|
with trio.move_on_after(timeout) as cs:
|
||||||
print(
|
|
||||||
f'PARENT `stream.receive()` with timeout={timeout}\n'
|
|
||||||
)
|
|
||||||
# NOTE: in the parent side IPC failure case this
|
# NOTE: in the parent side IPC failure case this
|
||||||
# will raise an ``EndOfChannel`` after the child
|
# will raise an ``EndOfChannel`` after the child
|
||||||
# is killed and sends a stop msg back to it's
|
# is killed and sends a stop msg back to it's
|
||||||
|
@ -279,29 +266,22 @@ async def main(
|
||||||
f'{rx}\n'
|
f'{rx}\n'
|
||||||
)
|
)
|
||||||
except trio.EndOfChannel:
|
except trio.EndOfChannel:
|
||||||
rx_eoc: bool = True
|
|
||||||
print('MsgStream got EoC for PARENT')
|
print('MsgStream got EoC for PARENT')
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# pretend to be a user seeing no streaming action
|
||||||
|
# thinking it's a hang, and then hitting ctl-c..
|
||||||
print(
|
print(
|
||||||
'Streaming finished and we got Eoc.\n'
|
f"YOO i'm a PARENT user anddd thingz hangin..\n"
|
||||||
'Canceling `.open_context()` in root with\n'
|
f'after timeout={timeout}\n'
|
||||||
'CTlR-C..'
|
|
||||||
)
|
)
|
||||||
if rx_eoc:
|
|
||||||
assert stream.closed
|
|
||||||
try:
|
|
||||||
await stream.send(i)
|
|
||||||
pytest.fail('stream not closed?')
|
|
||||||
except (
|
|
||||||
trio.ClosedResourceError,
|
|
||||||
trio.EndOfChannel,
|
|
||||||
) as send_err:
|
|
||||||
if rx_eoc:
|
|
||||||
assert send_err is stream._eoc
|
|
||||||
else:
|
|
||||||
assert send_err is stream._closed
|
|
||||||
|
|
||||||
|
print(
|
||||||
|
"YOO i'm mad!\n"
|
||||||
|
'The send side is dun but thingz hangin..\n'
|
||||||
|
'MASHING CTlR-C Ctl-c..'
|
||||||
|
)
|
||||||
raise KeyboardInterrupt
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,8 @@ def test_ipc_channel_break_during_stream(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if spawn_backend != 'trio':
|
if spawn_backend != 'trio':
|
||||||
if debug_mode:
|
# if debug_mode:
|
||||||
pytest.skip('`debug_mode` only supported on `trio` spawner')
|
# pytest.skip('`debug_mode` only supported on `trio` spawner')
|
||||||
|
|
||||||
# non-`trio` spawners should never hit the hang condition that
|
# non-`trio` spawners should never hit the hang condition that
|
||||||
# requires the user to do ctl-c to cancel the actor tree.
|
# requires the user to do ctl-c to cancel the actor tree.
|
||||||
|
@ -107,10 +107,7 @@ def test_ipc_channel_break_during_stream(
|
||||||
# AND we tell the child to call `MsgStream.aclose()`.
|
# AND we tell the child to call `MsgStream.aclose()`.
|
||||||
and pre_aclose_msgstream
|
and pre_aclose_msgstream
|
||||||
):
|
):
|
||||||
# expect_final_exc = trio.EndOfChannel
|
expect_final_exc = trio.EndOfChannel
|
||||||
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
|
|
||||||
# gracefully!
|
|
||||||
expect_final_exc = KeyboardInterrupt
|
|
||||||
|
|
||||||
# NOTE when ONLY the child breaks or it breaks BEFORE the
|
# NOTE when ONLY the child breaks or it breaks BEFORE the
|
||||||
# parent we expect the parent to get a closed resource error
|
# parent we expect the parent to get a closed resource error
|
||||||
|
@ -123,25 +120,11 @@ def test_ipc_channel_break_during_stream(
|
||||||
and
|
and
|
||||||
ipc_break['break_parent_ipc_after'] is False
|
ipc_break['break_parent_ipc_after'] is False
|
||||||
):
|
):
|
||||||
# NOTE: we DO NOT expect this any more since
|
expect_final_exc = trio.ClosedResourceError
|
||||||
# the child side's channel will be broken silently
|
|
||||||
# and nothing on the parent side will indicate this!
|
|
||||||
# expect_final_exc = trio.ClosedResourceError
|
|
||||||
|
|
||||||
# NOTE: child will send a 'stop' msg before it breaks
|
|
||||||
# the transport channel BUT, that will be absorbed by the
|
|
||||||
# `ctx.open_stream()` block and thus the `.open_context()`
|
|
||||||
# should hang, after which the test script simulates
|
|
||||||
# a user sending ctl-c by raising a KBI.
|
|
||||||
if pre_aclose_msgstream:
|
|
||||||
expect_final_exc = KeyboardInterrupt
|
|
||||||
|
|
||||||
# XXX OLD XXX
|
|
||||||
# if child calls `MsgStream.aclose()` then expect EoC.
|
# if child calls `MsgStream.aclose()` then expect EoC.
|
||||||
# ^ XXX not any more ^ since eoc is always absorbed
|
if pre_aclose_msgstream:
|
||||||
# gracefully and NOT bubbled to the `.open_context()`
|
expect_final_exc = trio.EndOfChannel
|
||||||
# block!
|
|
||||||
# expect_final_exc = trio.EndOfChannel
|
|
||||||
|
|
||||||
# BOTH but, CHILD breaks FIRST
|
# BOTH but, CHILD breaks FIRST
|
||||||
elif (
|
elif (
|
||||||
|
@ -151,8 +134,12 @@ def test_ipc_channel_break_during_stream(
|
||||||
> ipc_break['break_child_ipc_after']
|
> ipc_break['break_child_ipc_after']
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
|
expect_final_exc = trio.ClosedResourceError
|
||||||
|
|
||||||
|
# child will send a 'stop' msg before it breaks
|
||||||
|
# the transport channel.
|
||||||
if pre_aclose_msgstream:
|
if pre_aclose_msgstream:
|
||||||
expect_final_exc = KeyboardInterrupt
|
expect_final_exc = trio.EndOfChannel
|
||||||
|
|
||||||
# NOTE when the parent IPC side dies (even if the child's does as well
|
# NOTE when the parent IPC side dies (even if the child's does as well
|
||||||
# but the child fails BEFORE the parent) we always expect the
|
# but the child fails BEFORE the parent) we always expect the
|
||||||
|
@ -173,8 +160,7 @@ def test_ipc_channel_break_during_stream(
|
||||||
ipc_break['break_parent_ipc_after'] is not False
|
ipc_break['break_parent_ipc_after'] is not False
|
||||||
and (
|
and (
|
||||||
ipc_break['break_child_ipc_after']
|
ipc_break['break_child_ipc_after']
|
||||||
>
|
> ipc_break['break_parent_ipc_after']
|
||||||
ipc_break['break_parent_ipc_after']
|
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
expect_final_exc = trio.ClosedResourceError
|
expect_final_exc = trio.ClosedResourceError
|
||||||
|
@ -238,7 +224,6 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(3):
|
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
'ipc_breaker',
|
'ipc_breaker',
|
||||||
|
@ -256,10 +241,7 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
|
||||||
|
|
||||||
print('parent waiting on context')
|
print('parent waiting on context')
|
||||||
|
|
||||||
print(
|
print('parent exited context')
|
||||||
'parent exited context\n'
|
|
||||||
'parent raising KBI..\n'
|
|
||||||
)
|
|
||||||
raise KeyboardInterrupt
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
|
|
|
@ -16,11 +16,6 @@ from tractor import ( # typing
|
||||||
Portal,
|
Portal,
|
||||||
Context,
|
Context,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
RemoteActorError,
|
|
||||||
)
|
|
||||||
from tractor._testing import (
|
|
||||||
# tractor_test,
|
|
||||||
expect_ctxc,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX TODO cases:
|
# XXX TODO cases:
|
||||||
|
@ -161,11 +156,10 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
|
||||||
):
|
):
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
rae = excinfo.value
|
assert excinfo.value.type == TypeError
|
||||||
assert rae.boxed_type == TypeError
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -745,16 +739,14 @@ def test_peer_canceller(
|
||||||
with pytest.raises(ContextCancelled) as excinfo:
|
with pytest.raises(ContextCancelled) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
assert excinfo.value.boxed_type == ContextCancelled
|
assert excinfo.value.type == ContextCancelled
|
||||||
assert excinfo.value.canceller[0] == 'canceller'
|
assert excinfo.value.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def basic_echo_server(
|
async def basic_echo_server(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str = 'wittle_bruv',
|
peer_name: str = 'stepbro',
|
||||||
|
|
||||||
err_after: int|None = None,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -782,31 +774,17 @@ async def basic_echo_server(
|
||||||
# assert 0
|
# assert 0
|
||||||
await ipc.send(resp)
|
await ipc.send(resp)
|
||||||
|
|
||||||
if (
|
|
||||||
err_after
|
|
||||||
and i > err_after
|
|
||||||
):
|
|
||||||
raise RuntimeError(
|
|
||||||
f'Simulated error in `{peer_name}`'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def serve_subactors(
|
async def serve_subactors(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str,
|
peer_name: str,
|
||||||
debug_mode: bool,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async with open_nursery() as an:
|
async with open_nursery() as an:
|
||||||
|
|
||||||
# sanity
|
|
||||||
if debug_mode:
|
|
||||||
assert tractor._state.debug_mode()
|
|
||||||
|
|
||||||
await ctx.started(peer_name)
|
await ctx.started(peer_name)
|
||||||
async with ctx.open_stream() as ipc:
|
async with ctx.open_stream() as reqs:
|
||||||
async for msg in ipc:
|
async for msg in reqs:
|
||||||
peer_name: str = msg
|
peer_name: str = msg
|
||||||
peer: Portal = await an.start_actor(
|
peer: Portal = await an.start_actor(
|
||||||
name=peer_name,
|
name=peer_name,
|
||||||
|
@ -817,7 +795,7 @@ async def serve_subactors(
|
||||||
f'{peer_name}\n'
|
f'{peer_name}\n'
|
||||||
f'|_{peer}\n'
|
f'|_{peer}\n'
|
||||||
)
|
)
|
||||||
await ipc.send((
|
await reqs.send((
|
||||||
peer.chan.uid,
|
peer.chan.uid,
|
||||||
peer.chan.raddr,
|
peer.chan.raddr,
|
||||||
))
|
))
|
||||||
|
@ -829,20 +807,14 @@ async def serve_subactors(
|
||||||
async def client_req_subactor(
|
async def client_req_subactor(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
peer_name: str,
|
peer_name: str,
|
||||||
debug_mode: bool,
|
|
||||||
|
|
||||||
# used to simulate a user causing an error to be raised
|
# used to simulate a user causing an error to be raised
|
||||||
# directly in thread (like a KBI) to better replicate the
|
# directly in thread (like a KBI) to better replicate the
|
||||||
# case where a `modden` CLI client would hang afer requesting
|
# case where a `modden` CLI client would hang afer requesting
|
||||||
# a `Context.cancel()` to `bigd`'s wks spawner.
|
# a `Context.cancel()` to `bigd`'s wks spawner.
|
||||||
reraise_on_cancel: str|None = None,
|
reraise_on_cancel: str|None = None,
|
||||||
sub_err_after: int|None = None,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# sanity
|
|
||||||
if debug_mode:
|
|
||||||
assert tractor._state.debug_mode()
|
|
||||||
|
|
||||||
# TODO: other cases to do with sub lifetimes:
|
# TODO: other cases to do with sub lifetimes:
|
||||||
# -[ ] test that we can have the server spawn a sub
|
# -[ ] test that we can have the server spawn a sub
|
||||||
# that lives longer then ctx with this client.
|
# that lives longer then ctx with this client.
|
||||||
|
@ -864,7 +836,6 @@ async def client_req_subactor(
|
||||||
spawner.open_context(
|
spawner.open_context(
|
||||||
serve_subactors,
|
serve_subactors,
|
||||||
peer_name=peer_name,
|
peer_name=peer_name,
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as (spawner_ctx, first),
|
) as (spawner_ctx, first),
|
||||||
):
|
):
|
||||||
assert first == peer_name
|
assert first == peer_name
|
||||||
|
@ -886,7 +857,6 @@ async def client_req_subactor(
|
||||||
await tell_little_bro(
|
await tell_little_bro(
|
||||||
actor_name=sub_uid[0],
|
actor_name=sub_uid[0],
|
||||||
caller='client',
|
caller='client',
|
||||||
err_after=sub_err_after,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: test different scope-layers of
|
# TODO: test different scope-layers of
|
||||||
|
@ -898,7 +868,9 @@ async def client_req_subactor(
|
||||||
# TODO: would be super nice to have a special injected
|
# TODO: would be super nice to have a special injected
|
||||||
# cancel type here (maybe just our ctxc) but using
|
# cancel type here (maybe just our ctxc) but using
|
||||||
# some native mechanism in `trio` :p
|
# some native mechanism in `trio` :p
|
||||||
except trio.Cancelled as err:
|
except (
|
||||||
|
trio.Cancelled
|
||||||
|
) as err:
|
||||||
_err = err
|
_err = err
|
||||||
if reraise_on_cancel:
|
if reraise_on_cancel:
|
||||||
errtype = globals()['__builtins__'][reraise_on_cancel]
|
errtype = globals()['__builtins__'][reraise_on_cancel]
|
||||||
|
@ -925,9 +897,7 @@ async def client_req_subactor(
|
||||||
|
|
||||||
async def tell_little_bro(
|
async def tell_little_bro(
|
||||||
actor_name: str,
|
actor_name: str,
|
||||||
|
caller: str = ''
|
||||||
caller: str = '',
|
|
||||||
err_after: int|None = None,
|
|
||||||
):
|
):
|
||||||
# contact target actor, do a stream dialog.
|
# contact target actor, do a stream dialog.
|
||||||
async with (
|
async with (
|
||||||
|
@ -936,12 +906,10 @@ async def tell_little_bro(
|
||||||
) as lb,
|
) as lb,
|
||||||
lb.open_context(
|
lb.open_context(
|
||||||
basic_echo_server,
|
basic_echo_server,
|
||||||
|
|
||||||
# XXX proxy any delayed err condition
|
|
||||||
err_after=err_after,
|
|
||||||
) as (sub_ctx, first),
|
) as (sub_ctx, first),
|
||||||
|
sub_ctx.open_stream(
|
||||||
sub_ctx.open_stream() as echo_ipc,
|
basic_echo_server,
|
||||||
|
) as echo_ipc,
|
||||||
):
|
):
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
uid: tuple = actor.uid
|
uid: tuple = actor.uid
|
||||||
|
@ -968,15 +936,10 @@ async def tell_little_bro(
|
||||||
'raise_client_error',
|
'raise_client_error',
|
||||||
[None, 'KeyboardInterrupt'],
|
[None, 'KeyboardInterrupt'],
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'raise_sub_spawn_error_after',
|
|
||||||
[None, 50],
|
|
||||||
)
|
|
||||||
def test_peer_spawns_and_cancels_service_subactor(
|
def test_peer_spawns_and_cancels_service_subactor(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_client_error: str,
|
raise_client_error: str,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
raise_sub_spawn_error_after: int|None,
|
|
||||||
):
|
):
|
||||||
# NOTE: this tests for the modden `mod wks open piker` bug
|
# NOTE: this tests for the modden `mod wks open piker` bug
|
||||||
# discovered as part of implementing workspace ctx
|
# discovered as part of implementing workspace ctx
|
||||||
|
@ -990,16 +953,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# and the server's spawned child should cancel and terminate!
|
# and the server's spawned child should cancel and terminate!
|
||||||
peer_name: str = 'little_bro'
|
peer_name: str = 'little_bro'
|
||||||
|
|
||||||
def check_inner_rte(rae: RemoteActorError):
|
|
||||||
'''
|
|
||||||
Validate the little_bro's relayed inception!
|
|
||||||
|
|
||||||
'''
|
|
||||||
assert rae.boxed_type is RemoteActorError
|
|
||||||
assert rae.src_type is RuntimeError
|
|
||||||
assert 'client' in rae.relay_uid
|
|
||||||
assert peer_name in rae.src_uid
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||||
|
@ -1023,24 +976,14 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
server.open_context(
|
server.open_context(
|
||||||
serve_subactors,
|
serve_subactors,
|
||||||
peer_name=peer_name,
|
peer_name=peer_name,
|
||||||
debug_mode=debug_mode,
|
|
||||||
|
|
||||||
) as (spawn_ctx, first),
|
) as (spawn_ctx, first),
|
||||||
|
|
||||||
client.open_context(
|
client.open_context(
|
||||||
client_req_subactor,
|
client_req_subactor,
|
||||||
peer_name=peer_name,
|
peer_name=peer_name,
|
||||||
debug_mode=debug_mode,
|
|
||||||
reraise_on_cancel=raise_client_error,
|
reraise_on_cancel=raise_client_error,
|
||||||
|
|
||||||
# trigger for error condition in sub
|
|
||||||
# during streaming.
|
|
||||||
sub_err_after=raise_sub_spawn_error_after,
|
|
||||||
|
|
||||||
) as (client_ctx, client_says),
|
) as (client_ctx, client_says),
|
||||||
):
|
):
|
||||||
root: Actor = current_actor()
|
|
||||||
spawner_uid: tuple = spawn_ctx.chan.uid
|
|
||||||
print(
|
print(
|
||||||
f'Server says: {first}\n'
|
f'Server says: {first}\n'
|
||||||
f'Client says: {client_says}\n'
|
f'Client says: {client_says}\n'
|
||||||
|
@ -1050,7 +993,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
# (grandchild of this root actor) "little_bro"
|
# (grandchild of this root actor) "little_bro"
|
||||||
# and ensure we can also use it as an echo
|
# and ensure we can also use it as an echo
|
||||||
# server.
|
# server.
|
||||||
sub: Portal
|
|
||||||
async with tractor.wait_for_actor(
|
async with tractor.wait_for_actor(
|
||||||
name=peer_name,
|
name=peer_name,
|
||||||
) as sub:
|
) as sub:
|
||||||
|
@ -1062,116 +1004,41 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
f'.uid: {sub.actor.uid}\n'
|
f'.uid: {sub.actor.uid}\n'
|
||||||
f'chan.raddr: {sub.chan.raddr}\n'
|
f'chan.raddr: {sub.chan.raddr}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
async with expect_ctxc(
|
|
||||||
yay=raise_sub_spawn_error_after,
|
|
||||||
reraise=False,
|
|
||||||
):
|
|
||||||
await tell_little_bro(
|
await tell_little_bro(
|
||||||
actor_name=peer_name,
|
actor_name=peer_name,
|
||||||
caller='root',
|
caller='root',
|
||||||
)
|
)
|
||||||
|
|
||||||
if not raise_sub_spawn_error_after:
|
# signal client to raise a KBI
|
||||||
|
|
||||||
# signal client to cancel and maybe raise a KBI
|
|
||||||
await client_ctx.cancel()
|
await client_ctx.cancel()
|
||||||
print(
|
print('root cancelled client, checking that sub-spawn is down')
|
||||||
'-> root cancelling client,\n'
|
|
||||||
'-> root checking `client_ctx.result()`,\n'
|
|
||||||
f'-> checking that sub-spawn {peer_name} is down\n'
|
|
||||||
)
|
|
||||||
# else:
|
|
||||||
|
|
||||||
try:
|
|
||||||
res = await client_ctx.result(hide_tb=False)
|
|
||||||
|
|
||||||
# in remote (relayed inception) error
|
|
||||||
# case, we should error on the line above!
|
|
||||||
if raise_sub_spawn_error_after:
|
|
||||||
pytest.fail(
|
|
||||||
'Never rxed proxied `RemoteActorError[RuntimeError]` !?'
|
|
||||||
)
|
|
||||||
|
|
||||||
assert isinstance(res, ContextCancelled)
|
|
||||||
assert client_ctx.cancel_acked
|
|
||||||
assert res.canceller == root.uid
|
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
|
||||||
_err = rae
|
|
||||||
assert raise_sub_spawn_error_after
|
|
||||||
|
|
||||||
# since this is a "relayed error" via the client
|
|
||||||
# sub-actor, it is expected to be
|
|
||||||
# a `RemoteActorError` boxing another
|
|
||||||
# `RemoteActorError` otherwise known as
|
|
||||||
# an "inception" (from `trio`'s parlance)
|
|
||||||
# ((or maybe a "Matryoshka" and/or "matron"
|
|
||||||
# in our own working parlance)) which
|
|
||||||
# contains the source error from the
|
|
||||||
# little_bro: a `RuntimeError`.
|
|
||||||
#
|
|
||||||
check_inner_rte(rae)
|
|
||||||
assert rae.relay_uid == client.chan.uid
|
|
||||||
assert rae.src_uid == sub.chan.uid
|
|
||||||
|
|
||||||
assert not client_ctx.cancel_acked
|
|
||||||
assert (
|
|
||||||
client_ctx.maybe_error
|
|
||||||
is client_ctx.outcome
|
|
||||||
is rae
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
# await tractor.pause()
|
|
||||||
|
|
||||||
else:
|
|
||||||
assert not raise_sub_spawn_error_after
|
|
||||||
|
|
||||||
# cancelling the spawner sub should
|
|
||||||
# transitively cancel it's sub, the little
|
|
||||||
# bruv.
|
|
||||||
print('root cancelling server/client sub-actors')
|
|
||||||
await spawn_ctx.cancel()
|
|
||||||
async with tractor.find_actor(
|
async with tractor.find_actor(
|
||||||
name=peer_name,
|
name=peer_name,
|
||||||
) as sub:
|
) as sub:
|
||||||
assert not sub
|
assert not sub
|
||||||
|
|
||||||
# await server.cancel_actor()
|
print('root cancelling server/client sub-actors')
|
||||||
|
|
||||||
except RemoteActorError as rae:
|
# await tractor.pause()
|
||||||
# XXX more-or-less same as above handler
|
res = await client_ctx.result(hide_tb=False)
|
||||||
# this is just making sure the error bubbles out
|
assert isinstance(res, ContextCancelled)
|
||||||
# of the
|
assert client_ctx.cancel_acked
|
||||||
_err = rae
|
assert res.canceller == current_actor().uid
|
||||||
assert raise_sub_spawn_error_after
|
|
||||||
raise
|
await spawn_ctx.cancel()
|
||||||
|
# await server.cancel_actor()
|
||||||
|
|
||||||
# since we called `.cancel_actor()`, `.cancel_ack`
|
# since we called `.cancel_actor()`, `.cancel_ack`
|
||||||
# will not be set on the ctx bc `ctx.cancel()` was not
|
# will not be set on the ctx bc `ctx.cancel()` was not
|
||||||
# called directly fot this confext.
|
# called directly fot this confext.
|
||||||
except ContextCancelled as ctxc:
|
except ContextCancelled as ctxc:
|
||||||
_ctxc = ctxc
|
print('caught ctxc from contexts!')
|
||||||
print(
|
assert ctxc.canceller == current_actor().uid
|
||||||
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n'
|
|
||||||
f'{repr(ctxc)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if not raise_sub_spawn_error_after:
|
|
||||||
assert ctxc.canceller == root.uid
|
|
||||||
else:
|
|
||||||
assert ctxc.canceller == spawner_uid
|
|
||||||
|
|
||||||
assert ctxc is spawn_ctx.outcome
|
assert ctxc is spawn_ctx.outcome
|
||||||
assert ctxc is spawn_ctx.maybe_error
|
assert ctxc is spawn_ctx.maybe_error
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if raise_sub_spawn_error_after:
|
|
||||||
pytest.fail(
|
|
||||||
'context block(s) in PARENT never raised?!?'
|
|
||||||
)
|
|
||||||
|
|
||||||
if not raise_sub_spawn_error_after:
|
|
||||||
# assert spawn_ctx.cancel_acked
|
# assert spawn_ctx.cancel_acked
|
||||||
assert spawn_ctx.cancel_acked
|
assert spawn_ctx.cancel_acked
|
||||||
assert client_ctx.cancel_acked
|
assert client_ctx.cancel_acked
|
||||||
|
@ -1189,12 +1056,4 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
|
|
||||||
# assert spawn_ctx.cancelled_caught
|
# assert spawn_ctx.cancelled_caught
|
||||||
|
|
||||||
if raise_sub_spawn_error_after:
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
rae: RemoteActorError = excinfo.value
|
|
||||||
check_inner_rte(rae)
|
|
||||||
|
|
||||||
else:
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -111,6 +111,7 @@ class RemoteActorError(Exception):
|
||||||
reprol_fields: list[str] = [
|
reprol_fields: list[str] = [
|
||||||
'src_uid',
|
'src_uid',
|
||||||
'relay_path',
|
'relay_path',
|
||||||
|
# 'relay_uid',
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -486,11 +487,14 @@ def pack_error(
|
||||||
else:
|
else:
|
||||||
tb_str = traceback.format_exc()
|
tb_str = traceback.format_exc()
|
||||||
|
|
||||||
error_msg: dict[ # for IPC
|
our_uid: tuple = current_actor().uid
|
||||||
|
error_msg: dict[
|
||||||
str,
|
str,
|
||||||
str | tuple[str, str]
|
str | tuple[str, str]
|
||||||
] = {}
|
] = {
|
||||||
our_uid: tuple = current_actor().uid
|
'tb_str': tb_str,
|
||||||
|
'relay_uid': our_uid,
|
||||||
|
}
|
||||||
|
|
||||||
if (
|
if (
|
||||||
isinstance(exc, RemoteActorError)
|
isinstance(exc, RemoteActorError)
|
||||||
|
@ -531,11 +535,6 @@ def pack_error(
|
||||||
[],
|
[],
|
||||||
).append(our_uid)
|
).append(our_uid)
|
||||||
|
|
||||||
# XXX NOTE: always ensure the traceback-str is from the
|
|
||||||
# locally raised error (**not** the prior relay's boxed
|
|
||||||
# content's `.msgdata`).
|
|
||||||
error_msg['tb_str'] = tb_str
|
|
||||||
|
|
||||||
pkt: dict = {'error': error_msg}
|
pkt: dict = {'error': error_msg}
|
||||||
if cid:
|
if cid:
|
||||||
pkt['cid'] = cid
|
pkt['cid'] = cid
|
||||||
|
|
|
@ -715,12 +715,10 @@ class Actor:
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
# send msg loop terminate sentinel which
|
# send a msg loop terminate sentinel
|
||||||
# triggers cancellation of all remotely
|
|
||||||
# started tasks.
|
|
||||||
await chan.send(None)
|
await chan.send(None)
|
||||||
|
|
||||||
# XXX: do we want this? no right?
|
# XXX: do we want this?
|
||||||
# causes "[104] connection reset by peer" on other end
|
# causes "[104] connection reset by peer" on other end
|
||||||
# await chan.aclose()
|
# await chan.aclose()
|
||||||
|
|
||||||
|
@ -1210,10 +1208,10 @@ class Actor:
|
||||||
# - callee self raises ctxc before caller send request,
|
# - callee self raises ctxc before caller send request,
|
||||||
# - callee errors prior to cancel req.
|
# - callee errors prior to cancel req.
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancel request invalid, RPC task already completed?\n\n'
|
'Cancel request invalid, RPC task already completed?\n'
|
||||||
f'<= canceller: {requesting_uid}\n\n'
|
f'<= canceller: {requesting_uid}\n\n'
|
||||||
f'=> {cid}@{parent_chan.uid}\n'
|
f'=>{parent_chan}\n'
|
||||||
f' |_{parent_chan}\n'
|
f' |_ctx-id: {cid}\n'
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -1512,6 +1510,7 @@ async def async_main(
|
||||||
):
|
):
|
||||||
accept_addrs = set_accept_addr_says_rent
|
accept_addrs = set_accept_addr_says_rent
|
||||||
|
|
||||||
|
|
||||||
# The "root" nursery ensures the channel with the immediate
|
# The "root" nursery ensures the channel with the immediate
|
||||||
# parent is kept alive as a resilient service until
|
# parent is kept alive as a resilient service until
|
||||||
# cancellation steps have (mostly) occurred in
|
# cancellation steps have (mostly) occurred in
|
||||||
|
|
Loading…
Reference in New Issue