Always `finally` invoke cache-miss `lock.release()`s
Since the `await service_n.start()` on key-err can be cancel-masked (checkpoint interrupted before `_Cache.run_ctx` completes), we need to always `lock.release()` in to avoid lock-owner-state corruption and/or inf-hangs in peer cache-hitting tasks. Deats, - add a `try/except/finally` around the key-err triggered cache-miss `service_n.start(_Cache.run_ctx, ..)` call, reporting on any taskc and always `finally` unlocking. - fill out some log msg content and use `.debug()` level.to_asyncio_eoc_signal
parent
9ada628a57
commit
33ac3ca99f
|
@ -280,20 +280,44 @@ async def maybe_open_context(
|
||||||
yielded = _Cache.values[ctx_key]
|
yielded = _Cache.values[ctx_key]
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.debug(f'Allocating new {acm_func} for {ctx_key}')
|
log.debug(
|
||||||
|
f'Allocating new @acm-func entry\n'
|
||||||
|
f'ctx_key={ctx_key}\n'
|
||||||
|
f'acm_func={acm_func}\n'
|
||||||
|
)
|
||||||
mngr = acm_func(**kwargs)
|
mngr = acm_func(**kwargs)
|
||||||
resources = _Cache.resources
|
resources = _Cache.resources
|
||||||
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
||||||
resources[ctx_key] = (service_n, trio.Event())
|
resources[ctx_key] = (service_n, trio.Event())
|
||||||
|
|
||||||
# sync up to the mngr's yielded value
|
# sync up to the mngr's yielded value
|
||||||
yielded = await service_n.start(
|
try:
|
||||||
_Cache.run_ctx,
|
yielded: Any = await service_n.start(
|
||||||
mngr,
|
_Cache.run_ctx,
|
||||||
ctx_key,
|
mngr,
|
||||||
)
|
ctx_key,
|
||||||
_Cache.users += 1
|
)
|
||||||
lock.release()
|
_Cache.users += 1
|
||||||
|
except trio.Cancelled as taskc:
|
||||||
|
log.cancel(
|
||||||
|
f'Cancelled during caching?\n'
|
||||||
|
f'\n'
|
||||||
|
f'ctx_key: {ctx_key!r}\n'
|
||||||
|
f'mngr: {mngr!r}\n'
|
||||||
|
)
|
||||||
|
raise taskc
|
||||||
|
finally:
|
||||||
|
# XXX, since this runs from an `except` it's a checkpoint
|
||||||
|
# whih can be `trio.Cancelled`-masked.
|
||||||
|
#
|
||||||
|
# NOTE, in that case the mutex is never released by the
|
||||||
|
# (first and) caching task and **we can't** simply shield
|
||||||
|
# bc that will inf-block on the `await
|
||||||
|
# no_more_users.wait()`.
|
||||||
|
#
|
||||||
|
# SO just always unlock!
|
||||||
|
lock.release()
|
||||||
|
|
||||||
yield (
|
yield (
|
||||||
False, # cache_hit = "no"
|
False, # cache_hit = "no"
|
||||||
yielded,
|
yielded,
|
||||||
|
@ -301,7 +325,7 @@ async def maybe_open_context(
|
||||||
|
|
||||||
else:
|
else:
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
log.runtime(
|
log.debug(
|
||||||
f'Re-using cached resource for user {_Cache.users}\n\n'
|
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||||
f'{ctx_key!r} -> {type(yielded)}\n'
|
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue