Start a new `._testing.fault_simulation`

Since I needed the `break_ipc()` helper from the
`examples/advanced_faults/ipc_failure_during_stream.py` used in the
`test_advanced_faults` suite, might as well move it into a pkg-wide
importable module. Also changed the default break method to be
`socket_close` which just calls `Stream.socket.close()` underneath in
`trio`.

Also tweak that example to not keep sending after the stream has been
broken since with new `trio` that will raise `ClosedResourceError` and
in the wrapping test we generally speaking want to see a hang and then
cancel via simulated user sent SIGINT/ctl-c.
runtime_to_msgspec
Tyler Goodlet 2024-04-03 09:50:22 -04:00
parent 70ab60ce7c
commit 0fcd424d57
3 changed files with 112 additions and 69 deletions

View File

@ -21,75 +21,12 @@ import trio
import pytest import pytest
async def break_ipc(
stream: MsgStream,
method: str|None = None,
pre_close: bool = False,
def_method: str = 'eof',
) -> None:
'''
XXX: close the channel right after an error is raised
purposely breaking the IPC transport to make sure the parent
doesn't get stuck in debug or hang on the connection join.
this more or less simulates an infinite msg-receive hang on
the other end.
'''
# close channel via IPC prot msging before
# any transport breakage
if pre_close:
await stream.aclose()
method: str = method or def_method
print(
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
f'method: {method}\n'
f'pre `.aclose()`: {pre_close}\n'
'#################################\n'
)
match method:
case 'trans_aclose':
await stream._ctx.chan.transport.stream.aclose()
case 'eof':
await stream._ctx.chan.transport.stream.send_eof()
case 'msg':
await stream._ctx.chan.send(None)
# TODO: the actual real-world simulated cases like
# transport layer hangs and/or lower layer 2-gens type
# scenarios..
#
# -[ ] already have some issues for this general testing
# area:
# - https://github.com/goodboy/tractor/issues/97
# - https://github.com/goodboy/tractor/issues/124
# - PR from @guille:
# https://github.com/goodboy/tractor/pull/149
# case 'hang':
# TODO: framework research:
#
# - https://github.com/GuoTengda1993/pynetem
# - https://github.com/shopify/toxiproxy
# - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html
case _:
raise RuntimeError(
f'IPC break method unsupported: {method}'
)
async def break_ipc_then_error( async def break_ipc_then_error(
stream: MsgStream, stream: MsgStream,
break_ipc_with: str|None = None, break_ipc_with: str|None = None,
pre_close: bool = False, pre_close: bool = False,
): ):
await break_ipc( await _testing.break_ipc(
stream=stream, stream=stream,
method=break_ipc_with, method=break_ipc_with,
pre_close=pre_close, pre_close=pre_close,
@ -121,6 +58,7 @@ async def recv_and_spawn_net_killers(
Receive stream msgs and spawn some IPC killers mid-stream. Receive stream msgs and spawn some IPC killers mid-stream.
''' '''
broke_ipc: bool = False
await ctx.started() await ctx.started()
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
@ -128,13 +66,17 @@ async def recv_and_spawn_net_killers(
): ):
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')
await stream.send(i) if not broke_ipc:
await stream.send(i)
else:
await trio.sleep(0.01)
if ( if (
break_ipc_after break_ipc_after
and and
i >= break_ipc_after i >= break_ipc_after
): ):
broke_ipc = True
n.start_soon( n.start_soon(
iter_ipc_stream, iter_ipc_stream,
stream, stream,
@ -242,14 +184,13 @@ async def main(
# await stream._ctx.chan.send(None) # await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof() # await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose() await stream._ctx.chan.transport.stream.aclose()
ipc_break_sent = True ipc_break_sent = True
# it actually breaks right here in the # it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie # mp_spawn/forkserver backends and thus the
# reaper never even kicks in? # zombie reaper never even kicks in?
print(f'parent sending {i}')
try: try:
print(f'parent sending {i}')
await stream.send(i) await stream.send(i)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
print( print(
@ -262,6 +203,13 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
except trio.ClosedResourceError:
# NOTE: don't send if we already broke the
# connection to avoid raising a closed-error
# such that we drop through to the ctl-c
# mashing by user.
await trio.sleep(0.01)
# timeout: int = 1 # timeout: int = 1
# with trio.move_on_after(timeout) as cs: # with trio.move_on_after(timeout) as cs:
async with stuff_hangin_ctlc() as timeout: async with stuff_hangin_ctlc() as timeout:

View File

@ -26,6 +26,9 @@ import tractor
from .pytest import ( from .pytest import (
tractor_test as tractor_test tractor_test as tractor_test
) )
from .fault_simulation import (
break_ipc as break_ipc,
)
def repodir() -> pathlib.Path: def repodir() -> pathlib.Path:

View File

@ -0,0 +1,92 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`pytest` utils helpers and plugins for testing `tractor`'s runtime
and applications.
'''
from tractor import (
MsgStream,
)
async def break_ipc(
stream: MsgStream,
method: str|None = None,
pre_close: bool = False,
def_method: str = 'socket_close',
) -> None:
'''
XXX: close the channel right after an error is raised
purposely breaking the IPC transport to make sure the parent
doesn't get stuck in debug or hang on the connection join.
this more or less simulates an infinite msg-receive hang on
the other end.
'''
# close channel via IPC prot msging before
# any transport breakage
if pre_close:
await stream.aclose()
method: str = method or def_method
print(
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
f'method: {method}\n'
f'pre `.aclose()`: {pre_close}\n'
'#################################\n'
)
match method:
case 'socket_close':
await stream._ctx.chan.transport.stream.aclose()
case 'socket_eof':
# NOTE: `trio` does the following underneath this
# call in `src/trio/_highlevel_socket.py`:
# `Stream.socket.shutdown(tsocket.SHUT_WR)`
await stream._ctx.chan.transport.stream.send_eof()
# TODO: remove since now this will be invalid with our
# new typed msg spec?
# case 'msg':
# await stream._ctx.chan.send(None)
# TODO: the actual real-world simulated cases like
# transport layer hangs and/or lower layer 2-gens type
# scenarios..
#
# -[ ] already have some issues for this general testing
# area:
# - https://github.com/goodboy/tractor/issues/97
# - https://github.com/goodboy/tractor/issues/124
# - PR from @guille:
# https://github.com/goodboy/tractor/pull/149
# case 'hang':
# TODO: framework research:
#
# - https://github.com/GuoTengda1993/pynetem
# - https://github.com/shopify/toxiproxy
# - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html
case _:
raise RuntimeError(
f'IPC break method unsupported: {method}'
)