509 lines
15 KiB
Python
509 lines
15 KiB
Python
'''
|
|
Advanced streaming patterns using bidirectional streams and contexts.
|
|
|
|
'''
|
|
from collections import Counter
|
|
import itertools
|
|
import platform
|
|
from typing import Type
|
|
|
|
import pytest
|
|
import trio
|
|
import tractor
|
|
|
|
|
|
def is_win():
|
|
return platform.system() == 'Windows'
|
|
|
|
|
|
_registry: dict[str, set[tractor.MsgStream]] = {
|
|
'even': set(),
|
|
'odd': set(),
|
|
}
|
|
|
|
|
|
async def publisher(
|
|
|
|
seed: int = 0,
|
|
|
|
) -> None:
|
|
|
|
global _registry
|
|
|
|
def is_even(i):
|
|
return i % 2 == 0
|
|
|
|
for val in itertools.count(seed):
|
|
|
|
sub = 'even' if is_even(val) else 'odd'
|
|
|
|
for sub_stream in _registry[sub].copy():
|
|
await sub_stream.send(val)
|
|
|
|
# throttle send rate to ~1kHz
|
|
# making it readable to a human user
|
|
await trio.sleep(1/1000)
|
|
|
|
|
|
@tractor.context
|
|
async def subscribe(
|
|
|
|
ctx: tractor.Context,
|
|
|
|
) -> None:
|
|
|
|
global _registry
|
|
|
|
# syn caller
|
|
await ctx.started(None)
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
# update subs list as consumer requests
|
|
async for new_subs in stream:
|
|
|
|
new_subs = set(new_subs)
|
|
remove = new_subs - _registry.keys()
|
|
|
|
print(f'setting sub to {new_subs} for {ctx.chan.uid}')
|
|
|
|
# remove old subs
|
|
for sub in remove:
|
|
_registry[sub].remove(stream)
|
|
|
|
# add new subs for consumer
|
|
for sub in new_subs:
|
|
_registry[sub].add(stream)
|
|
|
|
|
|
async def consumer(
|
|
subs: list[str],
|
|
) -> None:
|
|
|
|
uid = tractor.current_actor().uid
|
|
|
|
async with tractor.wait_for_actor('publisher') as portal:
|
|
async with portal.open_context(subscribe) as (ctx, first):
|
|
async with ctx.open_stream() as stream:
|
|
|
|
# flip between the provided subs dynamically
|
|
if len(subs) > 1:
|
|
|
|
for sub in itertools.cycle(subs):
|
|
print(f'setting dynamic sub to {sub}')
|
|
await stream.send([sub])
|
|
|
|
count = 0
|
|
async for value in stream:
|
|
print(f'{uid} got: {value}')
|
|
if count > 5:
|
|
break
|
|
count += 1
|
|
|
|
else: # static sub
|
|
|
|
await stream.send(subs)
|
|
async for value in stream:
|
|
print(f'{uid} got: {value}')
|
|
|
|
|
|
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
|
|
# both pytest-timeout enforcement modes break trio under
|
|
# fork-based backends:
|
|
#
|
|
# - `method='signal'` (SIGALRM): the handler synchronously
|
|
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
|
|
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
|
|
# run got abandoned"), and EVERY subsequent `trio.run()`
|
|
# in the same pytest process bails with
|
|
# `RuntimeError: Attempted to call run() from inside a
|
|
# run()` — session-wide poison.
|
|
#
|
|
# - `method='thread'`: calls `_thread.interrupt_main()`
|
|
# raising `KeyboardInterrupt` into the main thread. Under
|
|
# fork-based backends with mid-cascade fd-juggling the KBI
|
|
# can escape trio's `KIManager` and bubble out of pytest
|
|
# itself — kills the WHOLE session.
|
|
#
|
|
# Instead we use `trio.fail_after()` INSIDE `main()` below:
|
|
# trio's own `Cancelled`/`TooSlowError` machinery handles the
|
|
# timeout, cleanly unwinds the actor nursery's cancel
|
|
# cascade, and only fails the single test (no cross-test
|
|
# state corruption either way).
|
|
#
|
|
# `pyproject.toml`'s default `timeout = 200` is still a
|
|
# last-resort safety net.
|
|
@pytest.mark.parametrize(
|
|
'expect_cancel_exc', [
|
|
KeyboardInterrupt,
|
|
trio.TooSlowError,
|
|
],
|
|
ids=lambda item:
|
|
f'expect_user_exc_raised={item.__name__}'
|
|
)
|
|
def test_dynamic_pub_sub(
|
|
reg_addr: tuple,
|
|
debug_mode: bool,
|
|
test_log: tractor.log.StackLevelAdapter,
|
|
reap_subactors_per_test: int,
|
|
expect_cancel_exc: Type[BaseException],
|
|
):
|
|
failed_to_raise_report: str = (
|
|
f'Never got a {expect_cancel_exc!r} ??'
|
|
)
|
|
|
|
global _registry
|
|
|
|
from multiprocessing import cpu_count
|
|
cpus = cpu_count()
|
|
|
|
# Hard safety cap via trio's own cancellation — see the
|
|
# module-level NOTE on why we avoid `pytest-timeout` for
|
|
# this test. Picked backend-aware: under `trio` backend
|
|
# spawn is cheap (~1s for `cpus` actors) but fork-based
|
|
# backends pay a per-spawn cost (forkserver round-trip +
|
|
# IPC peer-handshake) that can stack up over `cpus - 1`
|
|
# sequential `n.run_in_actor()` calls — especially on UDS
|
|
# under cross-pytest contention (#451 / #452). Empirically
|
|
# 12s flakes on `main_thread_forkserver`; 30s gives
|
|
# plenty of headroom while still failing-loud on a real
|
|
# hang.
|
|
from tractor.spawn import _spawn as _spawn_mod
|
|
fail_after_s: int = (
|
|
30
|
|
if _spawn_mod._spawn_method == 'main_thread_forkserver'
|
|
else 12
|
|
)
|
|
|
|
async def main():
|
|
with trio.fail_after(fail_after_s):
|
|
async with tractor.open_nursery(
|
|
registry_addrs=[reg_addr],
|
|
debug_mode=debug_mode,
|
|
) as n:
|
|
|
|
# name of this actor will be same as target func
|
|
await n.run_in_actor(publisher)
|
|
|
|
for i, sub in zip(
|
|
range(cpus - 2),
|
|
itertools.cycle(_registry.keys())
|
|
):
|
|
await n.run_in_actor(
|
|
consumer,
|
|
name=f'consumer_{sub}',
|
|
subs=[sub],
|
|
)
|
|
|
|
# make one dynamic subscriber
|
|
await n.run_in_actor(
|
|
consumer,
|
|
name='consumer_dynamic',
|
|
subs=list(_registry.keys()),
|
|
)
|
|
|
|
# block until "cancelled by user"
|
|
await trio.sleep(3)
|
|
test_log.warning(
|
|
f'Raising user cancel exc: '
|
|
f'{expect_cancel_exc!r}'
|
|
)
|
|
raise expect_cancel_exc('simulate user cancel!')
|
|
|
|
try:
|
|
trio.run(main)
|
|
pytest.fail(failed_to_raise_report)
|
|
except expect_cancel_exc:
|
|
# parent-side raised the user-cancel exc directly and
|
|
# it propagated unwrapped; clean path.
|
|
test_log.exception('Got user-cancel exc AS EXPECTED')
|
|
except BaseExceptionGroup as err:
|
|
# under fork-based backends the user-raised cancel
|
|
# can race with subactor-side stream teardown
|
|
# (`trio.EndOfChannel` from a publisher's `send()`
|
|
# whose remote half got cut). The expected exc may
|
|
# then be nested deeper in the group rather than at
|
|
# the top level. `BaseExceptionGroup.split()` walks
|
|
# the exc tree recursively (Python 3.11+).
|
|
matched, _ = err.split(expect_cancel_exc)
|
|
if matched is None:
|
|
pytest.fail(failed_to_raise_report)
|
|
|
|
test_log.exception('Got user-cancel exc AS EXPECTED')
|
|
|
|
|
|
@tractor.context
|
|
async def one_task_streams_and_one_handles_reqresp(
|
|
ctx: tractor.Context,
|
|
) -> None:
|
|
|
|
await ctx.started()
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
async def pingpong():
|
|
'''Run a simple req/response service.
|
|
|
|
'''
|
|
async for msg in stream:
|
|
print('rpc server ping')
|
|
assert msg == 'ping'
|
|
print('rpc server pong')
|
|
await stream.send('pong')
|
|
|
|
async with trio.open_nursery() as n:
|
|
n.start_soon(pingpong)
|
|
|
|
for _ in itertools.count():
|
|
await stream.send('yo')
|
|
await trio.sleep(0.01)
|
|
|
|
|
|
def test_reqresp_ontopof_streaming():
|
|
'''
|
|
Test a subactor that both streams with one task and
|
|
spawns another which handles a small requests-response
|
|
dialogue over the same bidir-stream.
|
|
|
|
'''
|
|
async def main():
|
|
|
|
# flat to make sure we get at least one pong
|
|
got_pong: bool = False
|
|
timeout: int = 2
|
|
|
|
if is_win(): # smh
|
|
timeout = 4
|
|
|
|
with trio.move_on_after(timeout):
|
|
async with tractor.open_nursery() as n:
|
|
|
|
# name of this actor will be same as target func
|
|
portal = await n.start_actor(
|
|
'dual_tasks',
|
|
enable_modules=[__name__]
|
|
)
|
|
|
|
async with portal.open_context(
|
|
one_task_streams_and_one_handles_reqresp,
|
|
|
|
) as (ctx, first):
|
|
|
|
assert first is None
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
await stream.send('ping')
|
|
|
|
async for msg in stream:
|
|
print(f'client received: {msg}')
|
|
|
|
assert msg in {'pong', 'yo'}
|
|
|
|
if msg == 'pong':
|
|
got_pong = True
|
|
await stream.send('ping')
|
|
print('client sent ping')
|
|
|
|
assert got_pong
|
|
|
|
try:
|
|
trio.run(main)
|
|
except trio.TooSlowError:
|
|
pass
|
|
|
|
|
|
async def async_gen_stream(sequence):
|
|
for i in sequence:
|
|
yield i
|
|
await trio.sleep(0.1)
|
|
|
|
|
|
@tractor.context
|
|
async def echo_ctx_stream(
|
|
ctx: tractor.Context,
|
|
) -> None:
|
|
await ctx.started()
|
|
|
|
async with ctx.open_stream() as stream:
|
|
async for msg in stream:
|
|
await stream.send(msg)
|
|
|
|
|
|
def test_sigint_both_stream_types():
|
|
'''
|
|
Verify that running a bi-directional and recv only stream
|
|
side-by-side will cancel correctly from SIGINT.
|
|
|
|
'''
|
|
timeout: float = 2
|
|
if is_win(): # smh
|
|
timeout += 1
|
|
|
|
async def main():
|
|
with trio.fail_after(timeout):
|
|
async with tractor.open_nursery() as n:
|
|
# name of this actor will be same as target func
|
|
portal = await n.start_actor(
|
|
'2_way',
|
|
enable_modules=[__name__]
|
|
)
|
|
|
|
async with portal.open_context(echo_ctx_stream) as (ctx, _):
|
|
async with ctx.open_stream() as stream:
|
|
async with portal.open_stream_from(
|
|
async_gen_stream,
|
|
sequence=list(range(1)),
|
|
) as gen_stream:
|
|
|
|
msg = await gen_stream.receive()
|
|
await stream.send(msg)
|
|
resp = await stream.receive()
|
|
assert resp == msg
|
|
raise KeyboardInterrupt
|
|
try:
|
|
trio.run(main)
|
|
assert 0, "Didn't receive KBI!?"
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
@tractor.context
|
|
async def inf_streamer(
|
|
ctx: tractor.Context,
|
|
|
|
) -> None:
|
|
'''
|
|
Stream increasing ints until terminated with a 'done' msg.
|
|
|
|
'''
|
|
await ctx.started()
|
|
|
|
async with (
|
|
ctx.open_stream() as stream,
|
|
|
|
# XXX TODO, INTERESTING CASE!!
|
|
# - if we don't collapse the eg then the embedded
|
|
# `trio.EndOfChannel` doesn't propagate directly to the above
|
|
# .open_stream() parent, resulting in it also raising instead
|
|
# of gracefully absorbing as normal.. so how to handle?
|
|
tractor.trionics.collapse_eg(),
|
|
trio.open_nursery() as tn,
|
|
):
|
|
async def close_stream_on_sentinel():
|
|
async for msg in stream:
|
|
if msg == 'done':
|
|
print(
|
|
'streamer RXed "done" sentinel msg!\n'
|
|
'CLOSING `MsgStream`!'
|
|
)
|
|
await stream.aclose()
|
|
else:
|
|
print(f'streamer received {msg}')
|
|
else:
|
|
print('streamer exited recv loop')
|
|
|
|
# start termination detector
|
|
tn.start_soon(close_stream_on_sentinel)
|
|
|
|
cap: int = 10000 # so that we don't spin forever when bug..
|
|
for val in range(cap):
|
|
try:
|
|
print(f'streamer sending {val}')
|
|
await stream.send(val)
|
|
if val > cap:
|
|
raise RuntimeError(
|
|
'Streamer never cancelled by setinel?'
|
|
)
|
|
await trio.sleep(0.001)
|
|
|
|
# close out the stream gracefully
|
|
except trio.ClosedResourceError:
|
|
print('transport closed on streamer side!')
|
|
assert stream.closed
|
|
break
|
|
else:
|
|
raise RuntimeError(
|
|
'Streamer not cancelled before finished sending?'
|
|
)
|
|
|
|
print('streamer exited .open_streamer() block')
|
|
|
|
|
|
# @pytest.mark.timeout(
|
|
# 6,
|
|
# method='signal',
|
|
# )
|
|
def test_local_task_fanout_from_stream(
|
|
reg_addr: tuple,
|
|
debug_mode: bool,
|
|
):
|
|
'''
|
|
Single stream with multiple local consumer tasks using the
|
|
``MsgStream.subscribe()` api.
|
|
|
|
Ensure all tasks receive all values after stream completes
|
|
sending.
|
|
|
|
'''
|
|
consumers: int = 22
|
|
|
|
async def main():
|
|
|
|
counts = Counter()
|
|
|
|
async with tractor.open_nursery(
|
|
debug_mode=debug_mode,
|
|
) as tn:
|
|
p: tractor.Portal = await tn.start_actor(
|
|
'inf_streamer',
|
|
enable_modules=[__name__],
|
|
)
|
|
async with (
|
|
p.open_context(inf_streamer) as (ctx, _),
|
|
ctx.open_stream() as stream,
|
|
):
|
|
async def pull_and_count(name: str):
|
|
# name = trio.lowlevel.current_task().name
|
|
async with stream.subscribe() as recver:
|
|
assert isinstance(
|
|
recver,
|
|
tractor.trionics.BroadcastReceiver
|
|
)
|
|
async for val in recver:
|
|
print(f'bx {name} rx: {val}')
|
|
counts[name] += 1
|
|
|
|
print(f'{name} bcaster ended')
|
|
|
|
print(f'{name} completed')
|
|
|
|
with trio.fail_after(3):
|
|
async with trio.open_nursery() as nurse:
|
|
for i in range(consumers):
|
|
nurse.start_soon(
|
|
pull_and_count,
|
|
i,
|
|
)
|
|
|
|
# delay to let bcast consumers pull msgs
|
|
await trio.sleep(0.5)
|
|
print('terminating nursery of bcast rxer consumers!')
|
|
await stream.send('done')
|
|
|
|
print('closed stream connection')
|
|
|
|
assert len(counts) == consumers
|
|
mx = max(counts.values())
|
|
# make sure each task received all stream values
|
|
assert all(val == mx for val in counts.values())
|
|
|
|
await p.cancel_actor()
|
|
|
|
async def w_timeout():
|
|
with trio.fail_after(6):
|
|
await main()
|
|
|
|
# trio.run(main)
|
|
trio.run(w_timeout)
|