Use per-key locking+user tracking in `maybe_open_context()`
(Hopefully!) solving a long-run bug with the `brokerd.kraken` backend in `piker`.. - Track `_Cache.users` per `ctx_key` via a `defaultdict[..., int]` instead of a single global counter; fix premature teardown when multiple ctx keys are active simultaneously. - Key `_Cache.locks` on `ctx_key` (not bare `fid`) so different kwarg sets for the same `acm_func` get independent `StrictFIFOLock`s. - Add `_UnresolvedCtx` sentinel class to replace bare `None` check; avoid false-positive teardown when a wrapped acm legitimately yields `None`. - Swap resource-exists `assert` for detailed `RuntimeError`. Also, - fix "whih" typo. - add debug logging for lock acquire/release lifecycle. (this commit-msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codemaybe_open_ctx_locking
parent
cab366cd65
commit
f086222d74
|
|
@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from collections import defaultdict
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
|
@ -39,6 +40,7 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from tractor.runtime._state import current_actor
|
from tractor.runtime._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
import tractor
|
||||||
# from ._beg import collapse_eg
|
# from ._beg import collapse_eg
|
||||||
# from ._taskc import (
|
# from ._taskc import (
|
||||||
# maybe_raise_from_masking_exc,
|
# maybe_raise_from_masking_exc,
|
||||||
|
|
@ -135,7 +137,7 @@ async def gather_contexts(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
seed: int = id(mngrs)
|
seed: int = id(mngrs)
|
||||||
unwrapped: dict[int, T | None] = {}.fromkeys(
|
unwrapped: dict[int, T|None] = {}.fromkeys(
|
||||||
(id(mngr) for mngr in mngrs),
|
(id(mngr) for mngr in mngrs),
|
||||||
seed,
|
seed,
|
||||||
)
|
)
|
||||||
|
|
@ -205,7 +207,10 @@ class _Cache:
|
||||||
'''
|
'''
|
||||||
service_tn: trio.Nursery|None = None
|
service_tn: trio.Nursery|None = None
|
||||||
locks: dict[Hashable, trio.Lock] = {}
|
locks: dict[Hashable, trio.Lock] = {}
|
||||||
users: int = 0
|
users: defaultdict[
|
||||||
|
tuple|Hashable,
|
||||||
|
int,
|
||||||
|
] = defaultdict(int)
|
||||||
values: dict[Any, Any] = {}
|
values: dict[Any, Any] = {}
|
||||||
resources: dict[
|
resources: dict[
|
||||||
Hashable,
|
Hashable,
|
||||||
|
|
@ -233,18 +238,32 @@ class _Cache:
|
||||||
value = cls.values.pop(ctx_key)
|
value = cls.values.pop(ctx_key)
|
||||||
finally:
|
finally:
|
||||||
# discard nursery ref so it won't be re-used (an error)?
|
# discard nursery ref so it won't be re-used (an error)?
|
||||||
cls.resources.pop(ctx_key)
|
_rsrcs = cls.resources.pop(ctx_key)
|
||||||
|
log.error(
|
||||||
|
f'Popping ctx resources\n'
|
||||||
|
f'{_rsrcs}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _UnresolvedCtx:
|
||||||
|
'''
|
||||||
|
Placeholder for the mabye-value delivered from some `acm_func`,
|
||||||
|
once (first) entered by a `maybe_open_context()` task.
|
||||||
|
|
||||||
|
Enables internal teardown logic conditioned on whether the
|
||||||
|
context was actually entered successfully vs. cancelled prior.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_context(
|
async def maybe_open_context(
|
||||||
|
|
||||||
acm_func: Callable[..., AsyncContextManager[T]],
|
acm_func: Callable[..., AsyncContextManager[T]],
|
||||||
|
|
||||||
# XXX: used as cache key after conversion to tuple
|
# XXX: used as cache key after conversion to tuple
|
||||||
# and all embedded values must also be hashable
|
# and all embedded values must also be hashable
|
||||||
kwargs: dict = {},
|
kwargs: dict = {},
|
||||||
key: Hashable | Callable[..., Hashable] = None,
|
key: Hashable|Callable[..., Hashable] = None,
|
||||||
|
|
||||||
# caller can provide their own scope
|
# caller can provide their own scope
|
||||||
tn: trio.Nursery|None = None,
|
tn: trio.Nursery|None = None,
|
||||||
|
|
@ -257,25 +276,59 @@ async def maybe_open_context(
|
||||||
Return the `_Cached` instance on a _Cache hit.
|
Return the `_Cached` instance on a _Cache hit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fid = id(acm_func)
|
fid: int = id(acm_func)
|
||||||
|
|
||||||
if inspect.isfunction(key):
|
if inspect.isfunction(key):
|
||||||
ctx_key = (fid, key(**kwargs))
|
ctx_key = (
|
||||||
|
fid,
|
||||||
|
key(**kwargs)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
ctx_key = (fid, key or tuple(kwargs.items()))
|
ctx_key = (
|
||||||
|
fid,
|
||||||
|
key or tuple(kwargs.items())
|
||||||
|
)
|
||||||
|
|
||||||
# yielded output
|
# yielded output
|
||||||
yielded: Any = None
|
# sentinel = object()
|
||||||
|
yielded: Any = _UnresolvedCtx
|
||||||
lock_registered: bool = False
|
lock_registered: bool = False
|
||||||
|
|
||||||
# Lock resource acquisition around task racing / ``trio``'s
|
# Lock resource acquisition around task racing / ``trio``'s
|
||||||
# scheduler protocol.
|
# scheduler protocol.
|
||||||
# NOTE: the lock is target context manager func specific in order
|
# NOTE: the lock is target context manager func specific in order
|
||||||
# to allow re-entrant use cases where one `maybe_open_context()`
|
# to allow re-entrant use cases where one `maybe_open_context()`
|
||||||
# wrapped factor may want to call into another.
|
# wrapped factory may want to call into another.
|
||||||
lock = _Cache.locks.setdefault(fid, trio.Lock())
|
task: trio.Task = trio.lowlevel.current_task()
|
||||||
lock_registered: bool = True
|
lock: trio.StrictFIFOLock|None = _Cache.locks.get(
|
||||||
|
# fid
|
||||||
|
ctx_key
|
||||||
|
)
|
||||||
|
if not lock:
|
||||||
|
lock = _Cache.locks[
|
||||||
|
ctx_key
|
||||||
|
# fid
|
||||||
|
] = trio.StrictFIFOLock()
|
||||||
|
# lock = _Cache.locks[fid] = trio.Lock()
|
||||||
|
header: str = 'Allocated NEW lock for @acm_func,\n'
|
||||||
|
lock_registered: bool = True
|
||||||
|
else:
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
header: str = 'Reusing OLD lock for @acm_func,\n'
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
f'{header}'
|
||||||
|
f'Acquiring..\n'
|
||||||
|
f'task={task!r}\n'
|
||||||
|
f'fid={fid!r}\n'
|
||||||
|
f'acm_func={acm_func}\n'
|
||||||
|
)
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
log.debug(
|
||||||
|
f'Acquir lock..\n'
|
||||||
|
f'task={task!r}\n'
|
||||||
|
f'fid={fid!r}\n'
|
||||||
|
f'acm_func={acm_func}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: one singleton nursery per actor and we want to
|
# XXX: one singleton nursery per actor and we want to
|
||||||
# have it not be closed until all consumers have exited (which is
|
# have it not be closed until all consumers have exited (which is
|
||||||
|
|
@ -312,6 +365,7 @@ async def maybe_open_context(
|
||||||
# checking the _Cache until complete otherwise the scheduler
|
# checking the _Cache until complete otherwise the scheduler
|
||||||
# may switch and by accident we create more then one resource.
|
# may switch and by accident we create more then one resource.
|
||||||
yielded = _Cache.values[ctx_key]
|
yielded = _Cache.values[ctx_key]
|
||||||
|
# XXX^ should key-err if not-yet-allocated
|
||||||
|
|
||||||
except KeyError as _ke:
|
except KeyError as _ke:
|
||||||
# XXX, stay mutexed up to cache-miss yield
|
# XXX, stay mutexed up to cache-miss yield
|
||||||
|
|
@ -322,19 +376,31 @@ async def maybe_open_context(
|
||||||
f'ctx_key={ctx_key}\n'
|
f'ctx_key={ctx_key}\n'
|
||||||
f'acm_func={acm_func}\n'
|
f'acm_func={acm_func}\n'
|
||||||
)
|
)
|
||||||
|
# await tractor.pause()
|
||||||
mngr = acm_func(**kwargs)
|
mngr = acm_func(**kwargs)
|
||||||
resources = _Cache.resources
|
resources = _Cache.resources
|
||||||
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
entry: tuple|None = resources.get(ctx_key)
|
||||||
|
if entry:
|
||||||
|
service_tn, ev = entry
|
||||||
|
# XXX, trace this.
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
raise RuntimeError(
|
||||||
|
f'Caching resources ALREADY exist?!\n'
|
||||||
|
f'ctx_key={ctx_key!r}\n'
|
||||||
|
f'acm_func={acm_func}\n'
|
||||||
|
f'task: {task}\n'
|
||||||
|
)
|
||||||
|
|
||||||
resources[ctx_key] = (service_tn, trio.Event())
|
resources[ctx_key] = (service_tn, trio.Event())
|
||||||
yielded: Any = await service_tn.start(
|
yielded: Any = await service_tn.start(
|
||||||
_Cache.run_ctx,
|
_Cache.run_ctx,
|
||||||
mngr,
|
mngr,
|
||||||
ctx_key,
|
ctx_key,
|
||||||
)
|
)
|
||||||
_Cache.users += 1
|
_Cache.users[ctx_key] += 1
|
||||||
finally:
|
finally:
|
||||||
# XXX, since this runs from an `except` it's a checkpoint
|
# XXX, since this runs from an `except` it's a checkpoint
|
||||||
# whih can be `trio.Cancelled`-masked.
|
# which can be `trio.Cancelled`-masked.
|
||||||
#
|
#
|
||||||
# NOTE, in that case the mutex is never released by the
|
# NOTE, in that case the mutex is never released by the
|
||||||
# (first and) caching task and **we can't** simply shield
|
# (first and) caching task and **we can't** simply shield
|
||||||
|
|
@ -365,9 +431,9 @@ async def maybe_open_context(
|
||||||
maybe_taskc.__context__ = None
|
maybe_taskc.__context__ = None
|
||||||
|
|
||||||
raise taskc
|
raise taskc
|
||||||
|
|
||||||
else:
|
else:
|
||||||
_Cache.users += 1
|
# XXX, cached-entry-path
|
||||||
|
_Cache.users[ctx_key] += 1
|
||||||
log.debug(
|
log.debug(
|
||||||
f'Re-using cached resource for user {_Cache.users}\n\n'
|
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||||
f'{ctx_key!r} -> {type(yielded)}\n'
|
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||||
|
|
@ -386,17 +452,29 @@ async def maybe_open_context(
|
||||||
finally:
|
finally:
|
||||||
if lock.locked():
|
if lock.locked():
|
||||||
stats: trio.LockStatistics = lock.statistics()
|
stats: trio.LockStatistics = lock.statistics()
|
||||||
|
owner: trio.Task|None = stats.owner
|
||||||
log.error(
|
log.error(
|
||||||
f'Lock left locked by last owner !?\n'
|
f'Lock never released by last owner={owner!r} !?\n'
|
||||||
f'{stats}\n'
|
f'{stats}\n'
|
||||||
|
f'\n'
|
||||||
|
f'task={task!r}\n'
|
||||||
|
f'fid={fid!r}\n'
|
||||||
|
f'acm_func={acm_func}\n'
|
||||||
|
|
||||||
)
|
)
|
||||||
|
# XXX, trace it.
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
|
||||||
_Cache.users -= 1
|
_Cache.users[ctx_key] -= 1
|
||||||
|
|
||||||
if yielded is not None:
|
if yielded is not _UnresolvedCtx:
|
||||||
# if no more consumers, teardown the client
|
# if no more consumers, teardown the client
|
||||||
if _Cache.users <= 0:
|
if _Cache.users[ctx_key] <= 0:
|
||||||
log.debug(f'De-allocating resource for {ctx_key}')
|
log.debug(
|
||||||
|
f'De-allocating @acm-func entry\n'
|
||||||
|
f'ctx_key={ctx_key!r}\n'
|
||||||
|
f'acm_func={acm_func!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: if we're cancelled we the entry may have never
|
# XXX: if we're cancelled we the entry may have never
|
||||||
# been entered since the nursery task was killed.
|
# been entered since the nursery task was killed.
|
||||||
|
|
@ -407,8 +485,11 @@ async def maybe_open_context(
|
||||||
no_more_users.set()
|
no_more_users.set()
|
||||||
|
|
||||||
if lock_registered:
|
if lock_registered:
|
||||||
maybe_lock = _Cache.locks.pop(fid, None)
|
maybe_lock = _Cache.locks.pop(
|
||||||
|
ctx_key,
|
||||||
|
None,
|
||||||
|
)
|
||||||
if maybe_lock is None:
|
if maybe_lock is None:
|
||||||
log.error(
|
log.error(
|
||||||
f'Resource lock for {fid} ALREADY POPPED?'
|
f'Resource lock for {ctx_key} ALREADY POPPED?'
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue