forked from goodboy/tractor
Refine example to drop IPC mid-stream
Use a task nursery in the subactor to spawn tasks which cancel the IPC channel mid stream to simulate the most concurrent case we're likely to see. Make `main()` accept a `debug_mode: bool` for parametrization. Fill out detailed comments/docs on this example.ipc_failure_while_streaming
parent
7394a187e0
commit
36a83cb306
|
@ -1,3 +1,11 @@
|
||||||
|
'''
|
||||||
|
Complex edge case where during real-time streaming the IPC tranport
|
||||||
|
channels are wiped out (purposely in this example though it could have
|
||||||
|
been an outage) and we want to ensure that despite being in debug mode
|
||||||
|
(or not) the user can sent SIGINT once they notice the hang and the
|
||||||
|
actor tree will eventually be cancelled without leaving any zombies.
|
||||||
|
|
||||||
|
'''
|
||||||
import trio
|
import trio
|
||||||
from tractor import (
|
from tractor import (
|
||||||
open_nursery,
|
open_nursery,
|
||||||
|
@ -7,89 +15,90 @@ from tractor import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def break_channel_silently(
|
async def break_channel_silently_then_error(
|
||||||
stream: MsgStream,
|
stream: MsgStream,
|
||||||
):
|
):
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
await stream.send(msg)
|
await stream.send(msg)
|
||||||
|
|
||||||
# XXX: close the channel right after an error is raised
|
# XXX: close the channel right after an error is raised
|
||||||
# purposely breaking the IPC transport to make sure the parent
|
# purposely breaking the IPC transport to make sure the parent
|
||||||
# doesn't get stuck in debug. this more or less simulates an
|
# doesn't get stuck in debug or hang on the connection join.
|
||||||
# infinite msg-receive hang on the other end. await
|
# this more or less simulates an infinite msg-receive hang on
|
||||||
|
# the other end.
|
||||||
|
# if msg > 66:
|
||||||
await stream._ctx.chan.send(None)
|
await stream._ctx.chan.send(None)
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
async def error_and_break_stream(
|
async def close_stream_and_error(
|
||||||
stream: MsgStream,
|
stream: MsgStream,
|
||||||
):
|
):
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
# echo back msg
|
|
||||||
await stream.send(msg)
|
await stream.send(msg)
|
||||||
|
|
||||||
try:
|
# wipe out channel right before raising
|
||||||
assert 0
|
await stream._ctx.chan.send(None)
|
||||||
except Exception:
|
await stream.aclose()
|
||||||
await stream._ctx.chan.send(None)
|
assert 0
|
||||||
|
|
||||||
# NOTE: doing this instead causes the error to propagate
|
|
||||||
# correctly.
|
|
||||||
# await stream.aclose()
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@context
|
@context
|
||||||
async def just_sleep(
|
async def recv_and_spawn_net_killers(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Start and sleep.
|
Receive stream msgs and spawn some IPC killers mid-stream.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
d = {}
|
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
try:
|
async with (
|
||||||
async with (
|
ctx.open_stream() as stream,
|
||||||
ctx.open_stream() as stream,
|
trio.open_nursery() as n,
|
||||||
trio.open_nursery() as n,
|
):
|
||||||
):
|
for i in range(100):
|
||||||
for i in range(100):
|
await stream.send(i)
|
||||||
await stream.send(i)
|
if i > 80:
|
||||||
if i > 50:
|
n.start_soon(break_channel_silently_then_error, stream)
|
||||||
n.start_soon(break_channel_silently, stream)
|
n.start_soon(close_stream_and_error, stream)
|
||||||
n.start_soon(error_and_break_stream, stream)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
d['10'] = 10
|
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
async def main(
|
||||||
|
debug_mode: bool = True,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
async with open_nursery(
|
async with open_nursery(
|
||||||
# loglevel='info',
|
|
||||||
debug_mode=True,
|
# NOTE: even debugger is used we shouldn't get
|
||||||
|
# a hang since it never engages due to broken IPC
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
|
||||||
) as n:
|
) as n:
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
'ctx_child',
|
'chitty_hijo',
|
||||||
|
|
||||||
# XXX: we don't enable the current module in order
|
|
||||||
# to trigger `ModuleNotFound`.
|
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
|
|
||||||
# add a debugger test to verify this works B)
|
|
||||||
# debug_mode=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
just_sleep, # taken from pytest parameterization
|
recv_and_spawn_net_killers,
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
await stream.send(10)
|
for i in range(100):
|
||||||
async for msg in stream:
|
await stream.send(i)
|
||||||
print(msg)
|
|
||||||
|
with trio.move_on_after(2) as cs:
|
||||||
|
rx = await stream.receive()
|
||||||
|
print(f'I a mad user and here is what i got {rx}')
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# pretend to be a user seeing no streaming action
|
||||||
|
# thinking it's a hang, and then hitting ctl-c..
|
||||||
|
print("YOO i'm a user and, thingz hangin.. CTL-C CTRL-C..")
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
Loading…
Reference in New Issue