From 5ad540c417b15b9ce8a7d9ce1731ed84722d4585 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Aug 2022 06:50:56 -0400 Subject: [PATCH 1/3] Add debug complete event `None`-guard for when already reset --- tractor/_debug.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 1cb29ac..9f8a704 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -126,7 +126,8 @@ class Lock: try: # sometimes the ``trio`` might already be terminated in # which case this call will raise. - cls.local_pdb_complete.set() + if cls.local_pdb_complete is not None: + cls.local_pdb_complete.set() finally: # restore original sigint handler cls.unshield_sigint() From bafd10a26060032567ccd6ba740263c9ce40f96e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Aug 2022 12:03:13 -0400 Subject: [PATCH 2/3] Make `maybe_open_context()` re-entrant safe, use per factory locks --- tractor/trionics/_mngrs.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index d9e392a..ab4b735 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -81,7 +81,7 @@ async def gather_contexts( This function is somewhat similar to common usage of ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in combo with ``asyncio.gather()`` except the managers are concurrently - entered and exited cancellation just works. + entered and exited, and cancellation just works. ''' unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs) @@ -119,7 +119,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' - lock = trio.Lock() + locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} resources: dict[ @@ -165,13 +165,18 @@ async def maybe_open_context( _Cached instance on a _Cache hit. ''' - # lock resource acquisition around task racing / ``trio``'s - # scheduler protocol - await _Cache.lock.acquire() - - ctx_key = (id(acm_func), key or tuple(kwargs.items())) + fid = id(acm_func) + ctx_key = (fid, key or tuple(kwargs.items())) value = None + # Lock resource acquisition around task racing / ``trio``'s + # scheduler protocol. + # NOTE: the lock is target context manager func specific in order + # to allow re-entrant use cases where one `maybe_open_context()` + # wrapped factor may want to call into another. + lock = _Cache.locks.setdefault(fid, trio.Lock()) + await lock.acquire() + try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler @@ -189,21 +194,21 @@ async def maybe_open_context( # TODO: does this need to be a tractor "root nursery"? resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - ln, _ = resources[ctx_key] = (service_n, trio.Event()) + resources[ctx_key] = (service_n, trio.Event()) - value = await ln.start( + value = await service_n.start( _Cache.run_ctx, mngr, ctx_key, ) _Cache.users += 1 - _Cache.lock.release() + lock.release() yield False, value else: log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 - _Cache.lock.release() + lock.release() yield True, value finally: @@ -221,3 +226,5 @@ async def maybe_open_context( if entry: _, no_more_users = entry no_more_users.set() + + _Cache.locks.pop(fid) From f07c3aa4a15cc2a9817c54c5c21d3e9f4b95f938 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 15 Sep 2022 19:39:34 -0400 Subject: [PATCH 3/3] Add nooz --- nooz/324.bugfix.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 nooz/324.bugfix.rst diff --git a/nooz/324.bugfix.rst b/nooz/324.bugfix.rst new file mode 100644 index 0000000..2f2229e --- /dev/null +++ b/nooz/324.bugfix.rst @@ -0,0 +1,4 @@ +Only set `._debug.Lock.local_pdb_complete` if has been created. + +This can be triggered by a very rare race condition (and thus we have no +working test yet) but it is known to exist in (a) consumer project(s).