tractor/tests/test_advanced_streaming.py

574 lines
18 KiB
Python

'''
Advanced streaming patterns using bidirectional streams and contexts.
'''
from collections import Counter
import itertools
import platform
from typing import Type
import pytest
import trio
import tractor
def is_win():
return platform.system() == 'Windows'
_registry: dict[str, set[tractor.MsgStream]] = {
'even': set(),
'odd': set(),
}
async def publisher(
seed: int = 0,
) -> None:
global _registry
def is_even(i):
return i % 2 == 0
for val in itertools.count(seed):
sub = 'even' if is_even(val) else 'odd'
for sub_stream in _registry[sub].copy():
await sub_stream.send(val)
# throttle send rate to ~1kHz
# making it readable to a human user
await trio.sleep(1/1000)
@tractor.context
async def subscribe(
ctx: tractor.Context,
) -> None:
global _registry
# syn caller
await ctx.started(None)
async with ctx.open_stream() as stream:
# update subs list as consumer requests
async for new_subs in stream:
new_subs = set(new_subs)
remove = new_subs - _registry.keys()
print(f'setting sub to {new_subs} for {ctx.chan.uid}')
# remove old subs
for sub in remove:
_registry[sub].remove(stream)
# add new subs for consumer
for sub in new_subs:
_registry[sub].add(stream)
async def consumer(
subs: list[str],
) -> None:
uid = tractor.current_actor().uid
async with tractor.wait_for_actor('publisher') as portal:
async with portal.open_context(subscribe) as (ctx, first):
async with ctx.open_stream() as stream:
# flip between the provided subs dynamically
if len(subs) > 1:
for sub in itertools.cycle(subs):
print(f'setting dynamic sub to {sub}')
await stream.send([sub])
count = 0
async for value in stream:
print(f'{uid} got: {value}')
if count > 5:
break
count += 1
else: # static sub
await stream.send(subs)
async for value in stream:
print(f'{uid} got: {value}')
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
# both pytest-timeout enforcement modes break trio under
# fork-based backends:
#
# - `method='signal'` (SIGALRM): the handler synchronously
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
# run got abandoned"), and EVERY subsequent `trio.run()`
# in the same pytest process bails with
# `RuntimeError: Attempted to call run() from inside a
# run()` — session-wide poison.
#
# - `method='thread'`: calls `_thread.interrupt_main()`
# raising `KeyboardInterrupt` into the main thread. Under
# fork-based backends with mid-cascade fd-juggling the KBI
# can escape trio's `KIManager` and bubble out of pytest
# itself — kills the WHOLE session.
#
# Instead we use `trio.fail_after()` INSIDE `main()` below:
# trio's own `Cancelled`/`TooSlowError` machinery handles the
# timeout, cleanly unwinds the actor nursery's cancel
# cascade, and only fails the single test (no cross-test
# state corruption either way).
#
# `pyproject.toml`'s default `timeout = 200` is still a
# last-resort safety net.
@pytest.mark.parametrize(
'expect_cancel_exc', [
KeyboardInterrupt,
trio.TooSlowError,
],
ids=lambda item:
f'expect_user_exc_raised={item.__name__}'
)
def test_dynamic_pub_sub(
reg_addr: tuple,
debug_mode: bool,
test_log: tractor.log.StackLevelAdapter,
reap_subactors_per_test: int,
expect_cancel_exc: Type[BaseException],
is_forking_spawner: bool,
set_fork_aware_capture,
):
failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??'
)
global _registry
from multiprocessing import cpu_count
cpus = cpu_count()
# Hard safety cap via trio's own cancellation. NOTE see the
# module-level note on why we avoid `pytest-timeout` for this
# test. Picked backend-aware: under `trio` backend spawn is
# cheap (~1s for `cpus` actors) but fork-based backends pay
# a per-spawn cost (forkserver round-trip + IPC peer-handshake)
# that can stack up over `cpus - 1` sequential `n.run_in_actor()`
# calls — especially on UDS under cross-pytest contention
# (#451 / #452). Empirically a flat 15s flakes on
# `main_thread_forkserver` for many-cpu hosts (a single bad
# spawn-stack puts total run-time at ~15.5s, just over);
# 30s gives plenty of headroom while still failing-loud on
# a real hang.
#
# XXX caveat: this is an *inner* `trio.fail_after` — its
# `Cancelled` cannot reach a task parked in a shielded `await`
# (e.g. inside actor-nursery teardown). When the in-band cancel
# path is itself buggy (the bug-class-3 `raise KBI` swallow we're
# currently chasing) this guard does NOT fire and the test sits
# forever until external SIGINT. The `_DIAG_CAP_S` outer guard
# below is the AFK-safety counterpart.
fail_after_s: int = (
4
if is_forking_spawner
else 12
)
# outer guard: when the inner fail_after fails to fire because of
# a shielded-await deadlock, this cap *aborts the trio run via
# signal.alarm → KBI* so AFK runs don't sit for >20min on the
# bug-class-3 hang. Slightly larger than `fail_after_s` so the
# trio-native path always wins when it works.
_DIAG_CAP_S: int = fail_after_s + 5
async def main():
# bug-class-3 breadcrumb: tag each level of the cancel path
# so when the run hangs and we capture cancel-level logs, the
# *last* breadcrumb that fired names the swallow point.
test_log.cancel('test_dynamic_pub_sub: enter main()')
try:
with trio.fail_after(fail_after_s):
test_log.cancel(
f'test_dynamic_pub_sub: '
f'enter `trio.fail_after({fail_after_s})` scope'
)
try:
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
test_log.cancel(
'test_dynamic_pub_sub: '
'actor nursery opened'
)
# name of this actor will be same as target func
await n.run_in_actor(publisher)
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor(
consumer,
name='consumer_dynamic',
subs=list(_registry.keys()),
)
# block until "cancelled by user"
await trio.sleep(3)
test_log.warning(
f'Raising user cancel exc: '
f'{expect_cancel_exc!r}'
)
test_log.cancel(
f'test_dynamic_pub_sub: '
f'ABOUT TO RAISE {expect_cancel_exc!r}'
)
raise expect_cancel_exc('simulate user cancel!')
finally:
test_log.cancel(
'test_dynamic_pub_sub: '
'actor nursery `__aexit__` returned'
)
test_log.cancel(
'test_dynamic_pub_sub: `fail_after` scope exited'
)
finally:
test_log.cancel(
'test_dynamic_pub_sub: leaving `main()`'
)
# outer signal-based guard — survives a shielded-await deadlock
# since `signal.alarm` raises in the main thread regardless of
# trio's scope state. ONLY armed under fork-based backends since
# the bug we're chasing is MTF-specific.
import signal
armed_alarm: bool = bool(is_forking_spawner)
if armed_alarm:
signal.alarm(_DIAG_CAP_S)
try:
try:
trio.run(main)
pytest.fail(failed_to_raise_report)
except expect_cancel_exc:
# parent-side raised the user-cancel exc directly and
# it propagated unwrapped; clean path.
test_log.exception('Got user-cancel exc AS EXPECTED')
except BaseExceptionGroup as err:
# under fork-based backends the user-raised cancel
# can race with subactor-side stream teardown
# (`trio.EndOfChannel` from a publisher's `send()`
# whose remote half got cut). The expected exc may
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
pytest.fail(failed_to_raise_report)
test_log.exception('Got user-cancel exc AS EXPECTED')
finally:
# always disarm so a passing test doesn't get killed
# post-trio.run by a stale alarm.
if armed_alarm:
signal.alarm(0)
@tractor.context
async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async def pingpong():
'''Run a simple req/response service.
'''
async for msg in stream:
print('rpc server ping')
assert msg == 'ping'
print('rpc server pong')
await stream.send('pong')
async with trio.open_nursery() as n:
n.start_soon(pingpong)
for _ in itertools.count():
await stream.send('yo')
await trio.sleep(0.01)
def test_reqresp_ontopof_streaming():
'''
Test a subactor that both streams with one task and
spawns another which handles a small requests-response
dialogue over the same bidir-stream.
'''
async def main():
# flat to make sure we get at least one pong
got_pong: bool = False
timeout: int = 2
if is_win(): # smh
timeout = 4
with trio.move_on_after(timeout):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'dual_tasks',
enable_modules=[__name__]
)
async with portal.open_context(
one_task_streams_and_one_handles_reqresp,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
await stream.send('ping')
async for msg in stream:
print(f'client received: {msg}')
assert msg in {'pong', 'yo'}
if msg == 'pong':
got_pong = True
await stream.send('ping')
print('client sent ping')
assert got_pong
try:
trio.run(main)
except trio.TooSlowError:
pass
async def async_gen_stream(sequence):
for i in sequence:
yield i
await trio.sleep(0.1)
@tractor.context
async def echo_ctx_stream(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
await stream.send(msg)
def test_sigint_both_stream_types():
'''
Verify that running a bi-directional and recv only stream
side-by-side will cancel correctly from SIGINT.
'''
timeout: float = 2
if is_win(): # smh
timeout += 1
async def main():
with trio.fail_after(timeout):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'2_way',
enable_modules=[__name__]
)
async with portal.open_context(echo_ctx_stream) as (ctx, _):
async with ctx.open_stream() as stream:
async with portal.open_stream_from(
async_gen_stream,
sequence=list(range(1)),
) as gen_stream:
msg = await gen_stream.receive()
await stream.send(msg)
resp = await stream.receive()
assert resp == msg
raise KeyboardInterrupt
# TODO, use pytest.raises() here instead?
# (why weren't we originally?)
try:
trio.run(main)
pytest.fail("Didn't receive KBI!?")
except KeyboardInterrupt:
pass
@tractor.context
async def inf_streamer(
ctx: tractor.Context,
) -> None:
'''
Stream increasing ints until terminated with a 'done' msg.
'''
await ctx.started()
async with (
ctx.open_stream() as stream,
# XXX TODO, INTERESTING CASE!!
# - if we don't collapse the eg then the embedded
# `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle?
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
async def close_stream_on_sentinel():
async for msg in stream:
if msg == 'done':
print(
'streamer RXed "done" sentinel msg!\n'
'CLOSING `MsgStream`!'
)
await stream.aclose()
else:
print(f'streamer received {msg}')
else:
print('streamer exited recv loop')
# start termination detector
tn.start_soon(close_stream_on_sentinel)
cap: int = 10000 # so that we don't spin forever when bug..
for val in range(cap):
try:
print(f'streamer sending {val}')
await stream.send(val)
if val > cap:
raise RuntimeError(
'Streamer never cancelled by setinel?'
)
await trio.sleep(0.001)
# close out the stream gracefully
except trio.ClosedResourceError:
print('transport closed on streamer side!')
assert stream.closed
break
else:
raise RuntimeError(
'Streamer not cancelled before finished sending?'
)
print('streamer exited .open_streamer() block')
# @pytest.mark.timeout(
# 6,
# method='signal',
# )
def test_local_task_fanout_from_stream(
reg_addr: tuple,
debug_mode: bool,
):
'''
Single stream with multiple local consumer tasks using the
``MsgStream.subscribe()` api.
Ensure all tasks receive all values after stream completes
sending.
'''
consumers: int = 22
async def main():
counts = Counter()
async with tractor.open_nursery(
debug_mode=debug_mode,
) as tn:
p: tractor.Portal = await tn.start_actor(
'inf_streamer',
enable_modules=[__name__],
)
async with (
p.open_context(inf_streamer) as (ctx, _),
ctx.open_stream() as stream,
):
async def pull_and_count(name: str):
# name = trio.lowlevel.current_task().name
async with stream.subscribe() as recver:
assert isinstance(
recver,
tractor.trionics.BroadcastReceiver
)
async for val in recver:
print(f'bx {name} rx: {val}')
counts[name] += 1
print(f'{name} bcaster ended')
print(f'{name} completed')
with trio.fail_after(3):
async with trio.open_nursery() as nurse:
for i in range(consumers):
nurse.start_soon(
pull_and_count,
i,
)
# delay to let bcast consumers pull msgs
await trio.sleep(0.5)
print('terminating nursery of bcast rxer consumers!')
await stream.send('done')
print('closed stream connection')
assert len(counts) == consumers
mx = max(counts.values())
# make sure each task received all stream values
assert all(val == mx for val in counts.values())
await p.cancel_actor()
async def w_timeout():
with trio.fail_after(6):
await main()
# trio.run(main)
trio.run(w_timeout)