Merge pull request #324 from goodboy/debug_event_guard

Add debug complete event `None`-guard for when already reset
lifetime_stack_tests
goodboy 2022-09-15 23:20:38 -04:00 committed by GitHub
commit fea9dc7065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 12 deletions

View File

@ -0,0 +1,4 @@
Only set `._debug.Lock.local_pdb_complete` if has been created.
This can be triggered by a very rare race condition (and thus we have no
working test yet) but it is known to exist in (a) consumer project(s).

View File

@ -126,6 +126,7 @@ class Lock:
try: try:
# sometimes the ``trio`` might already be terminated in # sometimes the ``trio`` might already be terminated in
# which case this call will raise. # which case this call will raise.
if cls.local_pdb_complete is not None:
cls.local_pdb_complete.set() cls.local_pdb_complete.set()
finally: finally:
# restore original sigint handler # restore original sigint handler

View File

@ -81,7 +81,7 @@ async def gather_contexts(
This function is somewhat similar to common usage of This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works. entered and exited, and cancellation just works.
''' '''
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs) unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
@ -119,7 +119,7 @@ class _Cache:
a kept-alive-while-in-use async resource. a kept-alive-while-in-use async resource.
''' '''
lock = trio.Lock() locks: dict[Hashable, trio.Lock] = {}
users: int = 0 users: int = 0
values: dict[Any, Any] = {} values: dict[Any, Any] = {}
resources: dict[ resources: dict[
@ -165,13 +165,18 @@ async def maybe_open_context(
_Cached instance on a _Cache hit. _Cached instance on a _Cache hit.
''' '''
# lock resource acquisition around task racing / ``trio``'s fid = id(acm_func)
# scheduler protocol ctx_key = (fid, key or tuple(kwargs.items()))
await _Cache.lock.acquire()
ctx_key = (id(acm_func), key or tuple(kwargs.items()))
value = None value = None
# Lock resource acquisition around task racing / ``trio``'s
# scheduler protocol.
# NOTE: the lock is target context manager func specific in order
# to allow re-entrant use cases where one `maybe_open_context()`
# wrapped factor may want to call into another.
lock = _Cache.locks.setdefault(fid, trio.Lock())
await lock.acquire()
try: try:
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler # checking the _Cache until complete otherwise the scheduler
@ -189,21 +194,21 @@ async def maybe_open_context(
# TODO: does this need to be a tractor "root nursery"? # TODO: does this need to be a tractor "root nursery"?
resources = _Cache.resources resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
ln, _ = resources[ctx_key] = (service_n, trio.Event()) resources[ctx_key] = (service_n, trio.Event())
value = await ln.start( value = await service_n.start(
_Cache.run_ctx, _Cache.run_ctx,
mngr, mngr,
ctx_key, ctx_key,
) )
_Cache.users += 1 _Cache.users += 1
_Cache.lock.release() lock.release()
yield False, value yield False, value
else: else:
log.info(f'Reusing _Cached resource for {ctx_key}') log.info(f'Reusing _Cached resource for {ctx_key}')
_Cache.users += 1 _Cache.users += 1
_Cache.lock.release() lock.release()
yield True, value yield True, value
finally: finally:
@ -221,3 +226,5 @@ async def maybe_open_context(
if entry: if entry:
_, no_more_users = entry _, no_more_users = entry
no_more_users.set() no_more_users.set()
_Cache.locks.pop(fid)