Tweak broadcast fanout test to never inf loop

Since a bug in the new `MsgStream.aclose()` impl's drain block logic was
triggering an actual inf loop (by not ever canceller the streamer child
actor), make sure we put a loop limit on the `inf_streamer`()` XD

Also add a bit more deats to the test `print()`s in each actor and toss
in `debug_mode` fixture support.
modden_spawn_from_client_req
Tyler Goodlet 2024-02-22 14:41:28 -05:00
parent e244747bc3
commit 5ea112699d
1 changed files with 44 additions and 15 deletions

View File

@ -298,52 +298,77 @@ async def inf_streamer(
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
trio.open_nursery() as tn,
):
async def bail_on_sentinel():
async def close_stream_on_sentinel():
async for msg in stream:
if msg == 'done':
print(
'streamer RXed "done" sentinel msg!\n'
'CLOSING `MsgStream`!'
)
await stream.aclose()
else:
print(f'streamer received {msg}')
else:
print('streamer exited recv loop')
# start termination detector
n.start_soon(bail_on_sentinel)
tn.start_soon(close_stream_on_sentinel)
for val in itertools.count():
cap: int = 10000 # so that we don't spin forever when bug..
for val in range(cap):
try:
print(f'streamer sending {val}')
await stream.send(val)
if val > cap:
raise RuntimeError(
'Streamer never cancelled by setinel?'
)
await trio.sleep(0.001)
# close out the stream gracefully
except trio.ClosedResourceError:
# close out the stream gracefully
print('msgstream closed on streamer side!')
assert stream.closed
break
else:
raise RuntimeError(
'Streamer not cancelled before finished sending?'
)
print('terminating streamer')
print('streamer exited .open_streamer() block')
def test_local_task_fanout_from_stream():
def test_local_task_fanout_from_stream(
debug_mode: bool,
):
'''
Single stream with multiple local consumer tasks using the
``MsgStream.subscribe()` api.
Ensure all tasks receive all values after stream completes sending.
Ensure all tasks receive all values after stream completes
sending.
'''
consumers = 22
consumers: int = 22
async def main():
counts = Counter()
async with tractor.open_nursery() as tn:
p = await tn.start_actor(
async with tractor.open_nursery(
debug_mode=debug_mode,
) as tn:
p: tractor.Portal = await tn.start_actor(
'inf_streamer',
enable_modules=[__name__],
)
# with trio.fail_after(3):
async with (
p.open_context(inf_streamer) as (ctx, _),
ctx.open_stream() as stream,
):
async def pull_and_count(name: str):
# name = trio.lowlevel.current_task().name
async with stream.subscribe() as recver:
@ -352,7 +377,7 @@ def test_local_task_fanout_from_stream():
tractor.trionics.BroadcastReceiver
)
async for val in recver:
# print(f'{name}: {val}')
print(f'bx {name} rx: {val}')
counts[name] += 1
print(f'{name} bcaster ended')
@ -362,10 +387,14 @@ def test_local_task_fanout_from_stream():
with trio.fail_after(3):
async with trio.open_nursery() as nurse:
for i in range(consumers):
nurse.start_soon(pull_and_count, i)
nurse.start_soon(
pull_and_count,
i,
)
# delay to let bcast consumers pull msgs
await trio.sleep(0.5)
print('\nterminating')
print('terminating nursery of bcast rxer consumers!')
await stream.send('done')
print('closed stream connection')