diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 9dca92b..60b28c3 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -21,75 +21,12 @@ import trio import pytest -async def break_ipc( - stream: MsgStream, - method: str|None = None, - pre_close: bool = False, - - def_method: str = 'eof', - -) -> None: - ''' - XXX: close the channel right after an error is raised - purposely breaking the IPC transport to make sure the parent - doesn't get stuck in debug or hang on the connection join. - this more or less simulates an infinite msg-receive hang on - the other end. - - ''' - # close channel via IPC prot msging before - # any transport breakage - if pre_close: - await stream.aclose() - - method: str = method or def_method - print( - '#################################\n' - 'Simulating CHILD-side IPC BREAK!\n' - f'method: {method}\n' - f'pre `.aclose()`: {pre_close}\n' - '#################################\n' - ) - - match method: - case 'trans_aclose': - await stream._ctx.chan.transport.stream.aclose() - - case 'eof': - await stream._ctx.chan.transport.stream.send_eof() - - case 'msg': - await stream._ctx.chan.send(None) - - # TODO: the actual real-world simulated cases like - # transport layer hangs and/or lower layer 2-gens type - # scenarios.. - # - # -[ ] already have some issues for this general testing - # area: - # - https://github.com/goodboy/tractor/issues/97 - # - https://github.com/goodboy/tractor/issues/124 - # - PR from @guille: - # https://github.com/goodboy/tractor/pull/149 - # case 'hang': - # TODO: framework research: - # - # - https://github.com/GuoTengda1993/pynetem - # - https://github.com/shopify/toxiproxy - # - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html - - case _: - raise RuntimeError( - f'IPC break method unsupported: {method}' - ) - - async def break_ipc_then_error( stream: MsgStream, break_ipc_with: str|None = None, pre_close: bool = False, ): - await break_ipc( + await _testing.break_ipc( stream=stream, method=break_ipc_with, pre_close=pre_close, @@ -121,6 +58,7 @@ async def recv_and_spawn_net_killers( Receive stream msgs and spawn some IPC killers mid-stream. ''' + broke_ipc: bool = False await ctx.started() async with ( ctx.open_stream() as stream, @@ -128,13 +66,17 @@ async def recv_and_spawn_net_killers( ): async for i in stream: print(f'child echoing {i}') - await stream.send(i) + if not broke_ipc: + await stream.send(i) + else: + await trio.sleep(0.01) if ( break_ipc_after and i >= break_ipc_after ): + broke_ipc = True n.start_soon( iter_ipc_stream, stream, @@ -242,14 +184,13 @@ async def main( # await stream._ctx.chan.send(None) # await stream._ctx.chan.transport.stream.send_eof() await stream._ctx.chan.transport.stream.aclose() - ipc_break_sent = True # it actually breaks right here in the - # mp_spawn/forkserver backends and thus the zombie - # reaper never even kicks in? - print(f'parent sending {i}') + # mp_spawn/forkserver backends and thus the + # zombie reaper never even kicks in? try: + print(f'parent sending {i}') await stream.send(i) except ContextCancelled as ctxc: print( @@ -262,6 +203,13 @@ async def main( # TODO: is this needed or no? raise + except trio.ClosedResourceError: + # NOTE: don't send if we already broke the + # connection to avoid raising a closed-error + # such that we drop through to the ctl-c + # mashing by user. + await trio.sleep(0.01) + # timeout: int = 1 # with trio.move_on_after(timeout) as cs: async with stuff_hangin_ctlc() as timeout: diff --git a/tractor/_testing/__init__.py b/tractor/_testing/__init__.py index 876c87e..fd79fe2 100644 --- a/tractor/_testing/__init__.py +++ b/tractor/_testing/__init__.py @@ -26,6 +26,9 @@ import tractor from .pytest import ( tractor_test as tractor_test ) +from .fault_simulation import ( + break_ipc as break_ipc, +) def repodir() -> pathlib.Path: diff --git a/tractor/_testing/fault_simulation.py b/tractor/_testing/fault_simulation.py new file mode 100644 index 0000000..fbd97bf --- /dev/null +++ b/tractor/_testing/fault_simulation.py @@ -0,0 +1,92 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +`pytest` utils helpers and plugins for testing `tractor`'s runtime +and applications. + +''' + +from tractor import ( + MsgStream, +) + +async def break_ipc( + stream: MsgStream, + method: str|None = None, + pre_close: bool = False, + + def_method: str = 'socket_close', + +) -> None: + ''' + XXX: close the channel right after an error is raised + purposely breaking the IPC transport to make sure the parent + doesn't get stuck in debug or hang on the connection join. + this more or less simulates an infinite msg-receive hang on + the other end. + + ''' + # close channel via IPC prot msging before + # any transport breakage + if pre_close: + await stream.aclose() + + method: str = method or def_method + print( + '#################################\n' + 'Simulating CHILD-side IPC BREAK!\n' + f'method: {method}\n' + f'pre `.aclose()`: {pre_close}\n' + '#################################\n' + ) + + match method: + case 'socket_close': + await stream._ctx.chan.transport.stream.aclose() + + case 'socket_eof': + # NOTE: `trio` does the following underneath this + # call in `src/trio/_highlevel_socket.py`: + # `Stream.socket.shutdown(tsocket.SHUT_WR)` + await stream._ctx.chan.transport.stream.send_eof() + + # TODO: remove since now this will be invalid with our + # new typed msg spec? + # case 'msg': + # await stream._ctx.chan.send(None) + + # TODO: the actual real-world simulated cases like + # transport layer hangs and/or lower layer 2-gens type + # scenarios.. + # + # -[ ] already have some issues for this general testing + # area: + # - https://github.com/goodboy/tractor/issues/97 + # - https://github.com/goodboy/tractor/issues/124 + # - PR from @guille: + # https://github.com/goodboy/tractor/pull/149 + # case 'hang': + # TODO: framework research: + # + # - https://github.com/GuoTengda1993/pynetem + # - https://github.com/shopify/toxiproxy + # - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html + + case _: + raise RuntimeError( + f'IPC break method unsupported: {method}' + )