From bafd10a26060032567ccd6ba740263c9ce40f96e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Aug 2022 12:03:13 -0400 Subject: [PATCH] Make `maybe_open_context()` re-entrant safe, use per factory locks --- tractor/trionics/_mngrs.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index d9e392a..ab4b735 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -81,7 +81,7 @@ async def gather_contexts( This function is somewhat similar to common usage of ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in combo with ``asyncio.gather()`` except the managers are concurrently - entered and exited cancellation just works. + entered and exited, and cancellation just works. ''' unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs) @@ -119,7 +119,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' - lock = trio.Lock() + locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} resources: dict[ @@ -165,13 +165,18 @@ async def maybe_open_context( _Cached instance on a _Cache hit. ''' - # lock resource acquisition around task racing / ``trio``'s - # scheduler protocol - await _Cache.lock.acquire() - - ctx_key = (id(acm_func), key or tuple(kwargs.items())) + fid = id(acm_func) + ctx_key = (fid, key or tuple(kwargs.items())) value = None + # Lock resource acquisition around task racing / ``trio``'s + # scheduler protocol. + # NOTE: the lock is target context manager func specific in order + # to allow re-entrant use cases where one `maybe_open_context()` + # wrapped factor may want to call into another. + lock = _Cache.locks.setdefault(fid, trio.Lock()) + await lock.acquire() + try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler @@ -189,21 +194,21 @@ async def maybe_open_context( # TODO: does this need to be a tractor "root nursery"? resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - ln, _ = resources[ctx_key] = (service_n, trio.Event()) + resources[ctx_key] = (service_n, trio.Event()) - value = await ln.start( + value = await service_n.start( _Cache.run_ctx, mngr, ctx_key, ) _Cache.users += 1 - _Cache.lock.release() + lock.release() yield False, value else: log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 - _Cache.lock.release() + lock.release() yield True, value finally: @@ -221,3 +226,5 @@ async def maybe_open_context( if entry: _, no_more_users = entry no_more_users.set() + + _Cache.locks.pop(fid)