Add WIP example of silent IPC breaks while streaming
parent
97d5f7233b
commit
158569adae
|
@ -0,0 +1,96 @@
|
||||||
|
import trio
|
||||||
|
from tractor import (
|
||||||
|
open_nursery,
|
||||||
|
context,
|
||||||
|
Context,
|
||||||
|
MsgStream,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def break_channel_silently(
|
||||||
|
stream: MsgStream,
|
||||||
|
):
|
||||||
|
async for msg in stream:
|
||||||
|
await stream.send(msg)
|
||||||
|
# XXX: close the channel right after an error is raised
|
||||||
|
# purposely breaking the IPC transport to make sure the parent
|
||||||
|
# doesn't get stuck in debug. this more or less simulates an
|
||||||
|
# infinite msg-receive hang on the other end. await
|
||||||
|
await stream._ctx.chan.send(None)
|
||||||
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
|
async def error_and_break_stream(
|
||||||
|
stream: MsgStream,
|
||||||
|
):
|
||||||
|
async for msg in stream:
|
||||||
|
# echo back msg
|
||||||
|
await stream.send(msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
assert 0
|
||||||
|
except Exception:
|
||||||
|
await stream._ctx.chan.send(None)
|
||||||
|
|
||||||
|
# NOTE: doing this instead causes the error to propagate
|
||||||
|
# correctly.
|
||||||
|
# await stream.aclose()
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@context
|
||||||
|
async def just_sleep(
|
||||||
|
|
||||||
|
ctx: Context,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Start and sleep.
|
||||||
|
|
||||||
|
'''
|
||||||
|
d = {}
|
||||||
|
await ctx.started()
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
):
|
||||||
|
for i in range(100):
|
||||||
|
await stream.send(i)
|
||||||
|
if i > 50:
|
||||||
|
n.start_soon(break_channel_silently, stream)
|
||||||
|
n.start_soon(error_and_break_stream, stream)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
d['10'] = 10
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
|
||||||
|
async with open_nursery(
|
||||||
|
# loglevel='info',
|
||||||
|
debug_mode=True,
|
||||||
|
) as n:
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'ctx_child',
|
||||||
|
|
||||||
|
# XXX: we don't enable the current module in order
|
||||||
|
# to trigger `ModuleNotFound`.
|
||||||
|
enable_modules=[__name__],
|
||||||
|
|
||||||
|
# add a debugger test to verify this works B)
|
||||||
|
# debug_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with portal.open_context(
|
||||||
|
just_sleep, # taken from pytest parameterization
|
||||||
|
) as (ctx, sent):
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
await stream.send(10)
|
||||||
|
async for msg in stream:
|
||||||
|
print(msg)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
Loading…
Reference in New Issue