Rework interface: pass func and kwargs

After more extensive testing I realized that keying on the context
manager *instance id* isn't going to work since each entering task is
going to create a unique key XD

Instead pass the manager function as `acm_func` and optionally allow
keying the resource on the passed `kwargs` (if hashable) or the
`key:str`. Further, pass the key to the enterer task and avoid
a separate keying scheme for the manager versus the value it delivers.
Don't bother with checking and releasing the lock in `finally:` block,
it should be an error if it's still locked.
context_caching
Tyler Goodlet 2021-12-15 13:42:47 -05:00
parent 3826bc9972
commit 52627a6326
1 changed files with 34 additions and 31 deletions

View File

@ -24,6 +24,7 @@ from typing import (
AsyncContextManager,
AsyncGenerator,
AsyncIterator,
Callable,
Hashable,
Optional,
Sequence,
@ -131,69 +132,76 @@ class _Cache:
async def run_ctx(
cls,
mng,
key,
ctx_key: tuple,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
_, no_more_users = cls.resources[ctx_key]
cls.values[ctx_key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key)
@acm
async def maybe_open_context(
key: Hashable,
mngr: AsyncContextManager[T],
acm_func: Callable[..., AsyncContextManager[T]],
# XXX: used as cache key after conversion to tuple
# and all embedded values must also be hashable
kwargs: Optional[dict] = {},
key: Hashable = None,
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open a context manager if there is not already a _Cached
version for the provided ``key``. Return the _Cached instance on
a _Cache hit.
version for the provided ``key`` for *this* actor. Return the
_Cached instance on a _Cache hit.
'''
# lock resource acquisition around task racing / ``trio``'s
# scheduler protocol
await _Cache.lock.acquire()
ctx_key = id(mngr)
ctx_key = (id(acm_func), key or tuple(kwargs.items()))
value = None
try:
value = _Cache.values[key]
except KeyError:
log.info(f'Allocating new resource for {key}')
# **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
# may switch and by accident we create more then one resource.
value = _Cache.values[ctx_key]
except KeyError:
log.info(f'Allocating new resource for {ctx_key}')
mngr = acm_func(**kwargs)
# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"?
assert not _Cache.resources.get(ctx_key), f'Resource exists? {ctx_key}'
ln, _ = _Cache.resources[ctx_key] = (service_n, trio.Event())
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
ln, _ = resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(_Cache.run_ctx, mngr, key)
value = await ln.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
_Cache.lock.release()
yield False, value
else:
log.info(f'Reusing _Cached resource for {key}')
log.info(f'Reusing _Cached resource for {ctx_key}')
_Cache.users += 1
_Cache.lock.release()
yield True, value
@ -204,13 +212,8 @@ async def maybe_open_context(
if value is not None:
# if no more consumers, teardown the client
if _Cache.users <= 0:
log.info(f'De-allocating resource for {key}')
if _Cache.lock.locked():
_Cache.lock.release()
log.info(f'De-allocating resource for {ctx_key}')
# terminate mngr nursery
entry = _Cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()
_, no_more_users = _Cache.resources[ctx_key]
no_more_users.set()