tractor/tests/test_advanced_streaming.py

509 lines
15 KiB
Python
Raw Normal View History

'''
Advanced streaming patterns using bidirectional streams and contexts.
'''
from collections import Counter
import itertools
2022-01-20 13:26:30 +00:00
import platform
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
from typing import Type
import pytest
import trio
import tractor
2022-01-20 13:26:30 +00:00
def is_win():
return platform.system() == 'Windows'
_registry: dict[str, set[tractor.MsgStream]] = {
'even': set(),
'odd': set(),
}
async def publisher(
seed: int = 0,
) -> None:
global _registry
def is_even(i):
return i % 2 == 0
for val in itertools.count(seed):
sub = 'even' if is_even(val) else 'odd'
2021-06-30 17:52:31 +00:00
for sub_stream in _registry[sub].copy():
await sub_stream.send(val)
2021-06-14 00:26:01 +00:00
# throttle send rate to ~1kHz
# making it readable to a human user
2021-06-14 00:26:01 +00:00
await trio.sleep(1/1000)
@tractor.context
async def subscribe(
ctx: tractor.Context,
) -> None:
global _registry
# syn caller
await ctx.started(None)
async with ctx.open_stream() as stream:
# update subs list as consumer requests
async for new_subs in stream:
new_subs = set(new_subs)
remove = new_subs - _registry.keys()
print(f'setting sub to {new_subs} for {ctx.chan.uid}')
# remove old subs
for sub in remove:
_registry[sub].remove(stream)
# add new subs for consumer
for sub in new_subs:
_registry[sub].add(stream)
async def consumer(
subs: list[str],
) -> None:
uid = tractor.current_actor().uid
async with tractor.wait_for_actor('publisher') as portal:
async with portal.open_context(subscribe) as (ctx, first):
async with ctx.open_stream() as stream:
# flip between the provided subs dynamically
if len(subs) > 1:
for sub in itertools.cycle(subs):
print(f'setting dynamic sub to {sub}')
await stream.send([sub])
count = 0
async for value in stream:
print(f'{uid} got: {value}')
if count > 5:
break
count += 1
else: # static sub
await stream.send(subs)
async for value in stream:
print(f'{uid} got: {value}')
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
# both pytest-timeout enforcement modes break trio under
# fork-based backends:
#
# - `method='signal'` (SIGALRM): the handler synchronously
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
# run got abandoned"), and EVERY subsequent `trio.run()`
# in the same pytest process bails with
# `RuntimeError: Attempted to call run() from inside a
# run()` — session-wide poison.
#
# - `method='thread'`: calls `_thread.interrupt_main()`
# raising `KeyboardInterrupt` into the main thread. Under
# fork-based backends with mid-cascade fd-juggling the KBI
# can escape trio's `KIManager` and bubble out of pytest
# itself — kills the WHOLE session.
#
# Instead we use `trio.fail_after()` INSIDE `main()` below:
# trio's own `Cancelled`/`TooSlowError` machinery handles the
# timeout, cleanly unwinds the actor nursery's cancel
# cascade, and only fails the single test (no cross-test
# state corruption either way).
#
# `pyproject.toml`'s default `timeout = 200` is still a
# last-resort safety net.
@pytest.mark.parametrize(
'expect_cancel_exc', [
KeyboardInterrupt,
trio.TooSlowError,
],
ids=lambda item:
f'expect_user_exc_raised={item.__name__}'
)
def test_dynamic_pub_sub(
reg_addr: tuple,
debug_mode: bool,
test_log: tractor.log.StackLevelAdapter,
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
reap_subactors_per_test: int,
expect_cancel_exc: Type[BaseException],
):
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??'
)
global _registry
from multiprocessing import cpu_count
cpus = cpu_count()
# Hard safety cap via trio's own cancellation — see the
# module-level NOTE on why we avoid `pytest-timeout` for
# this test. Picked backend-aware: under `trio` backend
# spawn is cheap (~1s for `cpus` actors) but fork-based
# backends pay a per-spawn cost (forkserver round-trip +
# IPC peer-handshake) that can stack up over `cpus - 1`
# sequential `n.run_in_actor()` calls — especially on UDS
# under cross-pytest contention (#451 / #452). Empirically
# 12s flakes on `main_thread_forkserver`; 30s gives
# plenty of headroom while still failing-loud on a real
# hang.
from tractor.spawn import _spawn as _spawn_mod
fail_after_s: int = (
30
if _spawn_mod._spawn_method == 'main_thread_forkserver'
else 12
)
async def main():
with trio.fail_after(fail_after_s):
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
# name of this actor will be same as target func
await n.run_in_actor(publisher)
for i, sub in zip(
range(cpus - 2),
itertools.cycle(_registry.keys())
):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
# make one dynamic subscriber
await n.run_in_actor(
consumer,
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
name='consumer_dynamic',
subs=list(_registry.keys()),
)
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
# block until "cancelled by user"
await trio.sleep(3)
test_log.warning(
f'Raising user cancel exc: '
f'{expect_cancel_exc!r}'
)
raise expect_cancel_exc('simulate user cancel!')
try:
trio.run(main)
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
pytest.fail(failed_to_raise_report)
except expect_cancel_exc:
# parent-side raised the user-cancel exc directly and
# it propagated unwrapped; clean path.
test_log.exception('Got user-cancel exc AS EXPECTED')
except BaseExceptionGroup as err:
# under fork-based backends the user-raised cancel
# can race with subactor-side stream teardown
# (`trio.EndOfChannel` from a publisher's `send()`
# whose remote half got cut). The expected exc may
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
pytest.fail(failed_to_raise_report)
test_log.exception('Got user-cancel exc AS EXPECTED')
2021-06-10 18:00:09 +00:00
@tractor.context
async def one_task_streams_and_one_handles_reqresp(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async def pingpong():
'''Run a simple req/response service.
'''
async for msg in stream:
print('rpc server ping')
assert msg == 'ping'
print('rpc server pong')
await stream.send('pong')
async with trio.open_nursery() as n:
n.start_soon(pingpong)
for _ in itertools.count():
await stream.send('yo')
await trio.sleep(0.01)
def test_reqresp_ontopof_streaming():
2022-01-20 13:26:30 +00:00
'''
Test a subactor that both streams with one task and
2021-06-10 18:00:09 +00:00
spawns another which handles a small requests-response
dialogue over the same bidir-stream.
'''
async def main():
2022-01-20 13:26:30 +00:00
# flat to make sure we get at least one pong
got_pong: bool = False
timeout: int = 2
if is_win(): # smh
timeout = 4
with trio.move_on_after(timeout):
2021-06-10 18:00:09 +00:00
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'dual_tasks',
enable_modules=[__name__]
)
async with portal.open_context(
one_task_streams_and_one_handles_reqresp,
) as (ctx, first):
assert first is None
async with ctx.open_stream() as stream:
await stream.send('ping')
async for msg in stream:
print(f'client received: {msg}')
assert msg in {'pong', 'yo'}
if msg == 'pong':
got_pong = True
await stream.send('ping')
print('client sent ping')
assert got_pong
try:
trio.run(main)
except trio.TooSlowError:
pass
async def async_gen_stream(sequence):
for i in sequence:
yield i
await trio.sleep(0.1)
@tractor.context
async def echo_ctx_stream(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
await stream.send(msg)
def test_sigint_both_stream_types():
'''
Verify that running a bi-directional and recv only stream
side-by-side will cancel correctly from SIGINT.
'''
2022-01-20 13:26:30 +00:00
timeout: float = 2
if is_win(): # smh
timeout += 1
async def main():
2022-01-20 13:26:30 +00:00
with trio.fail_after(timeout):
async with tractor.open_nursery() as n:
# name of this actor will be same as target func
portal = await n.start_actor(
'2_way',
enable_modules=[__name__]
)
async with portal.open_context(echo_ctx_stream) as (ctx, _):
async with ctx.open_stream() as stream:
async with portal.open_stream_from(
async_gen_stream,
sequence=list(range(1)),
) as gen_stream:
msg = await gen_stream.receive()
await stream.send(msg)
resp = await stream.receive()
assert resp == msg
raise KeyboardInterrupt
try:
trio.run(main)
assert 0, "Didn't receive KBI!?"
except KeyboardInterrupt:
pass
@tractor.context
async def inf_streamer(
ctx: tractor.Context,
) -> None:
'''
Stream increasing ints until terminated with a 'done' msg.
'''
await ctx.started()
async with (
ctx.open_stream() as stream,
# XXX TODO, INTERESTING CASE!!
# - if we don't collapse the eg then the embedded
# `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle?
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
async def close_stream_on_sentinel():
async for msg in stream:
if msg == 'done':
print(
'streamer RXed "done" sentinel msg!\n'
'CLOSING `MsgStream`!'
)
await stream.aclose()
else:
print(f'streamer received {msg}')
else:
print('streamer exited recv loop')
# start termination detector
tn.start_soon(close_stream_on_sentinel)
cap: int = 10000 # so that we don't spin forever when bug..
for val in range(cap):
try:
print(f'streamer sending {val}')
await stream.send(val)
if val > cap:
raise RuntimeError(
'Streamer never cancelled by setinel?'
)
await trio.sleep(0.001)
# close out the stream gracefully
except trio.ClosedResourceError:
print('transport closed on streamer side!')
assert stream.closed
break
else:
raise RuntimeError(
'Streamer not cancelled before finished sending?'
)
print('streamer exited .open_streamer() block')
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
@pytest.mark.timeout(
6,
method='signal',
)
def test_local_task_fanout_from_stream(
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
reg_addr: tuple,
debug_mode: bool,
):
'''
Single stream with multiple local consumer tasks using the
``MsgStream.subscribe()` api.
Ensure all tasks receive all values after stream completes
sending.
'''
consumers: int = 22
async def main():
counts = Counter()
async with tractor.open_nursery(
debug_mode=debug_mode,
) as tn:
p: tractor.Portal = await tn.start_actor(
'inf_streamer',
enable_modules=[__name__],
)
async with (
p.open_context(inf_streamer) as (ctx, _),
ctx.open_stream() as stream,
):
async def pull_and_count(name: str):
# name = trio.lowlevel.current_task().name
async with stream.subscribe() as recver:
assert isinstance(
recver,
tractor.trionics.BroadcastReceiver
)
async for val in recver:
print(f'bx {name} rx: {val}')
counts[name] += 1
print(f'{name} bcaster ended')
print(f'{name} completed')
with trio.fail_after(3):
async with trio.open_nursery() as nurse:
for i in range(consumers):
nurse.start_soon(
pull_and_count,
i,
)
# delay to let bcast consumers pull msgs
await trio.sleep(0.5)
print('terminating nursery of bcast rxer consumers!')
await stream.send('done')
print('closed stream connection')
assert len(counts) == consumers
mx = max(counts.values())
# make sure each task received all stream values
assert all(val == mx for val in counts.values())
await p.cancel_actor()
Use `trio.fail_after` cap in `test_dynamic_pub_sub` Drop `@pytest.mark.timeout(...)` for the per-test wall-clock cap on `test_dynamic_pub_sub`; rely on `trio.fail_after(12)` inside `main()` instead. Both pytest-timeout enforcement modes are incompatible with trio under fork-based backends: - `method='signal'` (SIGALRM) synchronously raises `Failed` in trio's main thread mid-`epoll.poll()`, leaving `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest run got abandoned") so EVERY subsequent `trio.run()` in the same pytest process bails with `RuntimeError: Attempted to call run() from inside a run()` — full-session poison. - `method='thread'` calls `_thread.interrupt_main()` which can let the KBI escape trio's `KIManager` under fork- cascade teardown races and bubble out of pytest entirely — kills the whole session. `trio.fail_after()` keeps cancellation inside the trio loop: - Raises `TooSlowError` cleanly through the open-nursery's cancel cascade. - Doesn't disturb any out-of-band signal/thread state. - Failure stays scoped to the single test — no cross-test global state corruption either way. Verified empirically: 10 hammer-runs of `test_dynamic_pub_sub` go from 5/10 fail (with global-state poison) to 3/10 fail (no poison, all sibling tests still pass). The ~30% remaining flake rate is a genuine fork-cancel-cascade hang — separate from this fix but no longer contaminates. Module-level NOTE comment explains the rationale so future readers don't re-introduce the bug. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-28 03:25:04 +00:00
async def w_timeout():
with trio.fail_after(6):
await main()
# trio.run(main)
trio.run(w_timeout)