diff --git a/examples/open_ctx_w_nursery.py b/examples/open_ctx_w_nursery.py new file mode 100644 index 0000000..99fc18d --- /dev/null +++ b/examples/open_ctx_w_nursery.py @@ -0,0 +1,96 @@ +import trio +from tractor import ( + open_nursery, + context, + Context, + MsgStream, +) + + +async def break_channel_silently( + stream: MsgStream, +): + async for msg in stream: + await stream.send(msg) + # XXX: close the channel right after an error is raised + # purposely breaking the IPC transport to make sure the parent + # doesn't get stuck in debug. this more or less simulates an + # infinite msg-receive hang on the other end. await + await stream._ctx.chan.send(None) + assert 0 + + +async def error_and_break_stream( + stream: MsgStream, +): + async for msg in stream: + # echo back msg + await stream.send(msg) + + try: + assert 0 + except Exception: + await stream._ctx.chan.send(None) + + # NOTE: doing this instead causes the error to propagate + # correctly. + # await stream.aclose() + raise + + +@context +async def just_sleep( + + ctx: Context, + **kwargs, + +) -> None: + ''' + Start and sleep. + + ''' + d = {} + await ctx.started() + try: + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + for i in range(100): + await stream.send(i) + if i > 50: + n.start_soon(break_channel_silently, stream) + n.start_soon(error_and_break_stream, stream) + + finally: + d['10'] = 10 + + +async def main() -> None: + + async with open_nursery( + # loglevel='info', + debug_mode=True, + ) as n: + portal = await n.start_actor( + 'ctx_child', + + # XXX: we don't enable the current module in order + # to trigger `ModuleNotFound`. + enable_modules=[__name__], + + # add a debugger test to verify this works B) + # debug_mode=True, + ) + + async with portal.open_context( + just_sleep, # taken from pytest parameterization + ) as (ctx, sent): + async with ctx.open_stream() as stream: + await stream.send(10) + async for msg in stream: + print(msg) + + +if __name__ == '__main__': + trio.run(main)