Add xfail test for `_Cache.run_ctx` teardown race
Reproduce the piker `open_cached_client('kraken')` scenario: identical
`ctx_key` callers share one cached resource, and a new task re-enters
during `__aexit__` — hitting `assert not resources.get()` bc `values`
was popped but `resources` wasn't yet.
Deats,
- `test_moc_reentry_during_teardown` uses an `in_aexit` event to
deterministically land in the teardown window.
- marked `xfail(raises=AssertionError)` against unpatched code (fix in
`9e49eddd` or wtv lands on the `maybe_open_ctx_locking` or thereafter
patch branch).
Also, add prompt-io log for the session.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
Prompt-IO: ai/prompt-io/claude/20260406T193125Z_85f9c5d_prompt_io.md
maybe_open_ctx_locking
parent
85f9c5df6f
commit
cab366cd65
|
|
@ -0,0 +1,57 @@
|
||||||
|
---
|
||||||
|
model: claude-opus-4-6
|
||||||
|
service: claude
|
||||||
|
session: (ad-hoc, not tracked via conf.toml)
|
||||||
|
timestamp: 2026-04-06T19:31:25Z
|
||||||
|
git_ref: 85f9c5d
|
||||||
|
scope: tests
|
||||||
|
substantive: true
|
||||||
|
raw_file: 20260406T193125Z_85f9c5d_prompt_io.raw.md
|
||||||
|
---
|
||||||
|
|
||||||
|
## Prompt
|
||||||
|
|
||||||
|
User asked to reproduce the exact `assert not resources.get(ctx_key)`
|
||||||
|
crash originally seen in piker's `brokerd.kraken` backend via
|
||||||
|
`open_cached_client('kraken')`. Key constraints from user:
|
||||||
|
|
||||||
|
- In piker, kwargs were the **same** (empty) so all callers
|
||||||
|
share one `ctx_key = (fid, ())`
|
||||||
|
- The root issue is `_Cache.locks` being indexed by `fid`
|
||||||
|
rather than `ctx_key`, plus the race window between
|
||||||
|
`values.pop()` and `resources.pop()` in `_Cache.run_ctx`
|
||||||
|
(the acm `__aexit__` has checkpoints in between)
|
||||||
|
- Mark the test `@pytest.mark.xfail` so it can be committed
|
||||||
|
against the unpatched code and later adjusted once the fix
|
||||||
|
from `9e49eddd` is applied
|
||||||
|
|
||||||
|
User pointed to piker's `open_cached_client('kraken')` usage
|
||||||
|
as the real-world pattern to model.
|
||||||
|
|
||||||
|
## Response summary
|
||||||
|
|
||||||
|
Added `test_moc_reentry_during_teardown` which deterministically
|
||||||
|
reproduces the teardown race:
|
||||||
|
|
||||||
|
1. A `cached_client()` acm (no params, like
|
||||||
|
`kraken.api.get_client()`) signals via `in_aexit` event
|
||||||
|
when its `__aexit__` starts, then sleeps to hold the
|
||||||
|
window open
|
||||||
|
2. `use_and_exit` task enters+exits the cached resource
|
||||||
|
3. `reenter_during_teardown` waits on `in_aexit` then
|
||||||
|
re-enters — landing exactly in the window where
|
||||||
|
`values` is popped but `resources` still exists
|
||||||
|
4. The old `assert not resources.get(ctx_key)` fires
|
||||||
|
5. `@pytest.mark.xfail(raises=AssertionError)` catches it
|
||||||
|
|
||||||
|
Test confirmed: `XFAIL` with the exact traceback at
|
||||||
|
`_mngrs.py:327`.
|
||||||
|
|
||||||
|
## Files changed
|
||||||
|
|
||||||
|
- `tests/test_resource_cache.py` — add
|
||||||
|
`test_moc_reentry_during_teardown` xfail test fn
|
||||||
|
|
||||||
|
## Human edits
|
||||||
|
|
||||||
|
None — committed as generated (pending user review).
|
||||||
|
|
@ -537,3 +537,93 @@ def test_per_ctx_key_resource_lifecycle(
|
||||||
a_exit.set()
|
a_exit.set()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail(
|
||||||
|
reason=(
|
||||||
|
'Demonstrates the `_Cache.run_ctx` teardown race: '
|
||||||
|
'a re-entering task hits '
|
||||||
|
'`assert not resources.get(ctx_key)` because '
|
||||||
|
'`values` was popped but `resources` was not yet '
|
||||||
|
'(acm `__aexit__` checkpoint in between). '
|
||||||
|
'Fixed by per-`ctx_key` locking in 9e49eddd.'
|
||||||
|
),
|
||||||
|
raises=AssertionError,
|
||||||
|
)
|
||||||
|
def test_moc_reentry_during_teardown(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Reproduce the piker `open_cached_client('kraken')` race:
|
||||||
|
|
||||||
|
- same `acm_func`, NO kwargs (identical `ctx_key`)
|
||||||
|
- multiple tasks share the cached resource
|
||||||
|
- all users exit -> teardown starts
|
||||||
|
- a NEW task enters during `_Cache.run_ctx.__aexit__`
|
||||||
|
- `values[ctx_key]` is gone (popped in inner finally)
|
||||||
|
but `resources[ctx_key]` still exists (outer finally
|
||||||
|
hasn't run yet bc the acm cleanup has checkpoints)
|
||||||
|
- old code: `assert not resources.get(ctx_key)` FIRES
|
||||||
|
|
||||||
|
This models the real-world scenario where `brokerd.kraken`
|
||||||
|
tasks concurrently call `open_cached_client('kraken')`
|
||||||
|
(same `acm_func`, empty kwargs, shared `ctx_key`) and
|
||||||
|
the teardown/re-entry race triggers intermittently.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
in_aexit = trio.Event()
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def cached_client():
|
||||||
|
'''
|
||||||
|
Simulates `kraken.api.get_client()`:
|
||||||
|
- no params (all callers share one `ctx_key`)
|
||||||
|
- slow-ish cleanup to widen the race window
|
||||||
|
between `values.pop()` and `resources.pop()`
|
||||||
|
inside `_Cache.run_ctx`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
yield 'the-client'
|
||||||
|
# Signal that we're in __aexit__ — at this
|
||||||
|
# point `values` has already been popped by
|
||||||
|
# `run_ctx`'s inner finally, but `resources`
|
||||||
|
# is still alive (outer finally hasn't run).
|
||||||
|
in_aexit.set()
|
||||||
|
await trio.sleep(10)
|
||||||
|
|
||||||
|
first_done = trio.Event()
|
||||||
|
|
||||||
|
async def use_and_exit():
|
||||||
|
async with maybe_open_context(
|
||||||
|
cached_client,
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert value == 'the-client'
|
||||||
|
first_done.set()
|
||||||
|
|
||||||
|
async def reenter_during_teardown():
|
||||||
|
'''
|
||||||
|
Wait for the acm's `__aexit__` to start (meaning
|
||||||
|
`values` is popped but `resources` still exists),
|
||||||
|
then re-enter — triggering the assert.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await in_aexit.wait()
|
||||||
|
async with maybe_open_context(
|
||||||
|
cached_client,
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert value == 'the-client'
|
||||||
|
|
||||||
|
with trio.fail_after(5):
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
tn.start_soon(use_and_exit)
|
||||||
|
tn.start_soon(reenter_during_teardown)
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue