diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index c7322a7..9dca92b 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -6,6 +6,7 @@ been an outage) and we want to ensure that despite being in debug mode actor tree will eventually be cancelled without leaving any zombies. ''' +from contextlib import asynccontextmanager as acm from functools import partial from tractor import ( @@ -17,6 +18,7 @@ from tractor import ( _testing, ) import trio +import pytest async def break_ipc( @@ -41,6 +43,13 @@ async def break_ipc( await stream.aclose() method: str = method or def_method + print( + '#################################\n' + 'Simulating CHILD-side IPC BREAK!\n' + f'method: {method}\n' + f'pre `.aclose()`: {pre_close}\n' + '#################################\n' + ) match method: case 'trans_aclose': @@ -80,17 +89,17 @@ async def break_ipc_then_error( break_ipc_with: str|None = None, pre_close: bool = False, ): + await break_ipc( + stream=stream, + method=break_ipc_with, + pre_close=pre_close, + ) async for msg in stream: await stream.send(msg) - await break_ipc( - stream=stream, - method=break_ipc_with, - pre_close=pre_close, - ) - assert 0 + + assert 0 -# async def close_stream_and_error( async def iter_ipc_stream( stream: MsgStream, break_ipc_with: str|None = None, @@ -99,20 +108,6 @@ async def iter_ipc_stream( async for msg in stream: await stream.send(msg) - # wipe out channel right before raising - # await break_ipc( - # stream=stream, - # method=break_ipc_with, - # pre_close=pre_close, - # ) - - # send channel close msg at SC-prot level - # - # TODO: what should get raised here if anything? - # await stream.aclose() - - # assert 0 - @context async def recv_and_spawn_net_killers( @@ -134,14 +129,16 @@ async def recv_and_spawn_net_killers( async for i in stream: print(f'child echoing {i}') await stream.send(i) + if ( break_ipc_after and - i > break_ipc_after + i >= break_ipc_after ): - '#################################\n' - 'Simulating CHILD-side IPC BREAK!\n' - '#################################\n' + n.start_soon( + iter_ipc_stream, + stream, + ) n.start_soon( partial( break_ipc_then_error, @@ -149,10 +146,23 @@ async def recv_and_spawn_net_killers( pre_close=pre_close, ) ) - n.start_soon( - iter_ipc_stream, - stream, - ) + + +@acm +async def stuff_hangin_ctlc(timeout: float = 1) -> None: + + with trio.move_on_after(timeout) as cs: + yield timeout + + if cs.cancelled_caught: + # pretend to be a user seeing no streaming action + # thinking it's a hang, and then hitting ctl-c.. + print( + f"i'm a user on the PARENT side and thingz hangin " + f'after timeout={timeout} ???\n\n' + 'MASHING CTlR-C..!?\n' + ) + raise KeyboardInterrupt async def main( @@ -169,9 +179,6 @@ async def main( ) -> None: - # from tractor._state import _runtime_vars as rtv - # rtv['_debug_mode'] = debug_mode - async with ( open_nursery( start_method=start_method, @@ -190,10 +197,11 @@ async def main( ) async with ( + stuff_hangin_ctlc(timeout=2) as timeout, _testing.expect_ctxc( yay=( break_parent_ipc_after - or break_child_ipc_after, + or break_child_ipc_after ), # TODO: we CAN'T remove this right? # since we need the ctxc to bubble up from either @@ -205,12 +213,14 @@ async def main( # and KBI in an eg? reraise=True, ), + portal.open_context( recv_and_spawn_net_killers, break_ipc_after=break_child_ipc_after, pre_close=pre_close, ) as (ctx, sent), ): + rx_eoc: bool = False ipc_break_sent: bool = False async with ctx.open_stream() as stream: for i in range(1000): @@ -228,6 +238,7 @@ async def main( '#################################\n' ) + # TODO: other methods? see break func above. # await stream._ctx.chan.send(None) # await stream._ctx.chan.transport.stream.send_eof() await stream._ctx.chan.transport.stream.aclose() @@ -251,10 +262,12 @@ async def main( # TODO: is this needed or no? raise - timeout: int = 1 - print(f'Entering `stream.receive()` with timeout={timeout}\n') - with trio.move_on_after(timeout) as cs: - + # timeout: int = 1 + # with trio.move_on_after(timeout) as cs: + async with stuff_hangin_ctlc() as timeout: + print( + f'PARENT `stream.receive()` with timeout={timeout}\n' + ) # NOTE: in the parent side IPC failure case this # will raise an ``EndOfChannel`` after the child # is killed and sends a stop msg back to it's @@ -266,23 +279,30 @@ async def main( f'{rx}\n' ) except trio.EndOfChannel: + rx_eoc: bool = True print('MsgStream got EoC for PARENT') raise - if cs.cancelled_caught: - # pretend to be a user seeing no streaming action - # thinking it's a hang, and then hitting ctl-c.. - print( - f"YOO i'm a PARENT user anddd thingz hangin..\n" - f'after timeout={timeout}\n' - ) + print( + 'Streaming finished and we got Eoc.\n' + 'Canceling `.open_context()` in root with\n' + 'CTlR-C..' + ) + if rx_eoc: + assert stream.closed + try: + await stream.send(i) + pytest.fail('stream not closed?') + except ( + trio.ClosedResourceError, + trio.EndOfChannel, + ) as send_err: + if rx_eoc: + assert send_err is stream._eoc + else: + assert send_err is stream._closed - print( - "YOO i'm mad!\n" - 'The send side is dun but thingz hangin..\n' - 'MASHING CTlR-C Ctl-c..' - ) - raise KeyboardInterrupt + raise KeyboardInterrupt if __name__ == '__main__': diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 8b73b4c..5f73ac6 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -85,8 +85,8 @@ def test_ipc_channel_break_during_stream( ''' if spawn_backend != 'trio': - # if debug_mode: - # pytest.skip('`debug_mode` only supported on `trio` spawner') + if debug_mode: + pytest.skip('`debug_mode` only supported on `trio` spawner') # non-`trio` spawners should never hit the hang condition that # requires the user to do ctl-c to cancel the actor tree. @@ -107,7 +107,10 @@ def test_ipc_channel_break_during_stream( # AND we tell the child to call `MsgStream.aclose()`. and pre_aclose_msgstream ): - expect_final_exc = trio.EndOfChannel + # expect_final_exc = trio.EndOfChannel + # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this + # gracefully! + expect_final_exc = KeyboardInterrupt # NOTE when ONLY the child breaks or it breaks BEFORE the # parent we expect the parent to get a closed resource error @@ -120,11 +123,25 @@ def test_ipc_channel_break_during_stream( and ipc_break['break_parent_ipc_after'] is False ): - expect_final_exc = trio.ClosedResourceError + # NOTE: we DO NOT expect this any more since + # the child side's channel will be broken silently + # and nothing on the parent side will indicate this! + # expect_final_exc = trio.ClosedResourceError - # if child calls `MsgStream.aclose()` then expect EoC. + # NOTE: child will send a 'stop' msg before it breaks + # the transport channel BUT, that will be absorbed by the + # `ctx.open_stream()` block and thus the `.open_context()` + # should hang, after which the test script simulates + # a user sending ctl-c by raising a KBI. if pre_aclose_msgstream: - expect_final_exc = trio.EndOfChannel + expect_final_exc = KeyboardInterrupt + + # XXX OLD XXX + # if child calls `MsgStream.aclose()` then expect EoC. + # ^ XXX not any more ^ since eoc is always absorbed + # gracefully and NOT bubbled to the `.open_context()` + # block! + # expect_final_exc = trio.EndOfChannel # BOTH but, CHILD breaks FIRST elif ( @@ -134,12 +151,8 @@ def test_ipc_channel_break_during_stream( > ipc_break['break_child_ipc_after'] ) ): - expect_final_exc = trio.ClosedResourceError - - # child will send a 'stop' msg before it breaks - # the transport channel. if pre_aclose_msgstream: - expect_final_exc = trio.EndOfChannel + expect_final_exc = KeyboardInterrupt # NOTE when the parent IPC side dies (even if the child's does as well # but the child fails BEFORE the parent) we always expect the @@ -160,7 +173,8 @@ def test_ipc_channel_break_during_stream( ipc_break['break_parent_ipc_after'] is not False and ( ipc_break['break_child_ipc_after'] - > ipc_break['break_parent_ipc_after'] + > + ipc_break['break_parent_ipc_after'] ) ): expect_final_exc = trio.ClosedResourceError @@ -224,25 +238,29 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages(): ''' async def main(): - async with tractor.open_nursery() as n: - portal = await n.start_actor( - 'ipc_breaker', - enable_modules=[__name__], - ) + with trio.fail_after(3): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'ipc_breaker', + enable_modules=[__name__], + ) - with trio.move_on_after(1): - async with ( - portal.open_context( - break_ipc_after_started - ) as (ctx, sent), - ): - async with ctx.open_stream(): - await trio.sleep(0.5) + with trio.move_on_after(1): + async with ( + portal.open_context( + break_ipc_after_started + ) as (ctx, sent), + ): + async with ctx.open_stream(): + await trio.sleep(0.5) - print('parent waiting on context') + print('parent waiting on context') - print('parent exited context') - raise KeyboardInterrupt + print( + 'parent exited context\n' + 'parent raising KBI..\n' + ) + raise KeyboardInterrupt with pytest.raises(KeyboardInterrupt): trio.run(main)