diff --git a/examples/open_ctx_w_nursery.py b/examples/open_ctx_w_nursery.py index 99fc18d..8969222 100644 --- a/examples/open_ctx_w_nursery.py +++ b/examples/open_ctx_w_nursery.py @@ -1,3 +1,11 @@ +''' +Complex edge case where during real-time streaming the IPC tranport +channels are wiped out (purposely in this example though it could have +been an outage) and we want to ensure that despite being in debug mode +(or not) the user can sent SIGINT once they notice the hang and the +actor tree will eventually be cancelled without leaving any zombies. + +''' import trio from tractor import ( open_nursery, @@ -7,89 +15,90 @@ from tractor import ( ) -async def break_channel_silently( +async def break_channel_silently_then_error( stream: MsgStream, ): async for msg in stream: await stream.send(msg) + # XXX: close the channel right after an error is raised # purposely breaking the IPC transport to make sure the parent - # doesn't get stuck in debug. this more or less simulates an - # infinite msg-receive hang on the other end. await + # doesn't get stuck in debug or hang on the connection join. + # this more or less simulates an infinite msg-receive hang on + # the other end. + # if msg > 66: await stream._ctx.chan.send(None) assert 0 -async def error_and_break_stream( +async def close_stream_and_error( stream: MsgStream, ): async for msg in stream: - # echo back msg await stream.send(msg) - try: - assert 0 - except Exception: - await stream._ctx.chan.send(None) - - # NOTE: doing this instead causes the error to propagate - # correctly. - # await stream.aclose() - raise + # wipe out channel right before raising + await stream._ctx.chan.send(None) + await stream.aclose() + assert 0 @context -async def just_sleep( +async def recv_and_spawn_net_killers( ctx: Context, **kwargs, ) -> None: ''' - Start and sleep. + Receive stream msgs and spawn some IPC killers mid-stream. ''' - d = {} await ctx.started() - try: - async with ( - ctx.open_stream() as stream, - trio.open_nursery() as n, - ): - for i in range(100): - await stream.send(i) - if i > 50: - n.start_soon(break_channel_silently, stream) - n.start_soon(error_and_break_stream, stream) - - finally: - d['10'] = 10 + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + for i in range(100): + await stream.send(i) + if i > 80: + n.start_soon(break_channel_silently_then_error, stream) + n.start_soon(close_stream_and_error, stream) -async def main() -> None: +async def main( + debug_mode: bool = True, + +) -> None: async with open_nursery( - # loglevel='info', - debug_mode=True, + + # NOTE: even debugger is used we shouldn't get + # a hang since it never engages due to broken IPC + debug_mode=debug_mode, + ) as n: portal = await n.start_actor( - 'ctx_child', - - # XXX: we don't enable the current module in order - # to trigger `ModuleNotFound`. + 'chitty_hijo', enable_modules=[__name__], - - # add a debugger test to verify this works B) - # debug_mode=True, ) async with portal.open_context( - just_sleep, # taken from pytest parameterization + recv_and_spawn_net_killers, ) as (ctx, sent): async with ctx.open_stream() as stream: - await stream.send(10) - async for msg in stream: - print(msg) + for i in range(100): + await stream.send(i) + + with trio.move_on_after(2) as cs: + rx = await stream.receive() + print(f'I a mad user and here is what i got {rx}') + + if cs.cancelled_caught: + # pretend to be a user seeing no streaming action + # thinking it's a hang, and then hitting ctl-c.. + print("YOO i'm a user and, thingz hangin.. CTL-C CTRL-C..") + raise KeyboardInterrupt if __name__ == '__main__':