tractor/examples/open_ctx_w_nursery.py

97 lines
2.2 KiB
Python

import trio
from tractor import (
open_nursery,
context,
Context,
MsgStream,
)
async def break_channel_silently(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)
# XXX: close the channel right after an error is raised
# purposely breaking the IPC transport to make sure the parent
# doesn't get stuck in debug. this more or less simulates an
# infinite msg-receive hang on the other end. await
await stream._ctx.chan.send(None)
assert 0
async def error_and_break_stream(
stream: MsgStream,
):
async for msg in stream:
# echo back msg
await stream.send(msg)
try:
assert 0
except Exception:
await stream._ctx.chan.send(None)
# NOTE: doing this instead causes the error to propagate
# correctly.
# await stream.aclose()
raise
@context
async def just_sleep(
ctx: Context,
**kwargs,
) -> None:
'''
Start and sleep.
'''
d = {}
await ctx.started()
try:
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
for i in range(100):
await stream.send(i)
if i > 50:
n.start_soon(break_channel_silently, stream)
n.start_soon(error_and_break_stream, stream)
finally:
d['10'] = 10
async def main() -> None:
async with open_nursery(
# loglevel='info',
debug_mode=True,
) as n:
portal = await n.start_actor(
'ctx_child',
# XXX: we don't enable the current module in order
# to trigger `ModuleNotFound`.
enable_modules=[__name__],
# add a debugger test to verify this works B)
# debug_mode=True,
)
async with portal.open_context(
just_sleep, # taken from pytest parameterization
) as (ctx, sent):
async with ctx.open_stream() as stream:
await stream.send(10)
async for msg in stream:
print(msg)
if __name__ == '__main__':
trio.run(main)