From edc2211444ca55e1547b528e612cfa749e41889f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 16:12:06 -0400 Subject: [PATCH 1/6] Go multi-line-style tuples in `maybe_enter_context()` Allows for an inline comment of the first "cache hit" bool element. --- tractor/trionics/_mngrs.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index fd243da0..0f2e7746 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -294,7 +294,10 @@ async def maybe_open_context( ) _Cache.users += 1 lock.release() - yield False, yielded + yield ( + False, # cache_hit = "no" + yielded, + ) else: _Cache.users += 1 @@ -308,7 +311,10 @@ async def maybe_open_context( # f'{ctx_key!r} -> {yielded!r}\n' ) lock.release() - yield True, yielded + yield ( + True, # cache_hit = "yes" + yielded, + ) finally: _Cache.users -= 1 From 1c425cbd226e61dbc1565ab5c147a1d08f1d6105 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 16:48:46 -0400 Subject: [PATCH 2/6] Tool-up `test_resource_cache.test_open_local_sub_to_stream` Since I recently discovered a very subtle race-case that can sometimes cause the suite to hang, seemingly due to the `an: ActorNursery` allocated *behind* the `.trionics.maybe_open_context()` usage; this can result in never cancelling the 'streamer' subactor despite the `main()` timeout-guard? This led me to dig in and find that the underlying issue was 2-fold, - our `BroadcastReceiver` termination-mgmt semantics in `MsgStream.subscribe()` can result in the first subscribing task to always keep the `MsgStream._broadcaster` instance allocated; it's never `.aclose()`ed, which makes it tough to determine (and thus trace) when all subscriber-tasks are actually complete and exited-from-`.subscribe()`.. - i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in `._runtime.async_main()`'s shutdown sequence which would then compound the issue resulting in a SIGINT-shielded hang.. the worst kind XD Actual changes here are just styling, printing, and some mucking with passing the `an`-ref up to the parent task in the root-actor where i was doing a conditional `ActorNursery.cancel()` to mk sure that was actually the problem. Presuming this is fixed the `.pause()` i left unmasked should never hit. --- tests/test_resource_cache.py | 113 ++++++++++++++++++++++++++++------- 1 file changed, 91 insertions(+), 22 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index bdcdd6c9..e7958b9c 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on): with trio.move_on_after(0.5): async with ( tractor.open_root_actor(), - trio.open_nursery() as n, + trio.open_nursery() as tn, ): - for i in range(10): - n.start_soon(enter_cached_mngr, f'task_{i}') + tn.start_soon( + enter_cached_mngr, + f'task_{i}', + ) await trio.sleep(0.001) trio.run(main) @@ -98,23 +100,34 @@ async def streamer( @acm -async def open_stream() -> Awaitable[tractor.MsgStream]: - +async def open_stream() -> Awaitable[ + tuple[ + tractor.ActorNursery, + tractor.MsgStream, + ] +]: try: async with tractor.open_nursery() as an: portal = await an.start_actor( 'streamer', enable_modules=[__name__], ) - async with ( - portal.open_context(streamer) as (ctx, first), - ctx.open_stream() as stream, - ): - yield stream + try: + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + print('Entered open_stream() caller') + yield an, stream + print('Exited open_stream() caller') + # await tractor.pause(shield=True) - print('Cancelling streamer') - await portal.cancel_actor() - print('Cancelled streamer') + finally: + print('Cancelling streamer') + # await tractor.pause(shield=True) + with trio.CancelScope(shield=True): + await portal.cancel_actor() + print('Cancelled streamer') except Exception as err: print( @@ -130,8 +143,10 @@ async def maybe_open_stream(taskname: str): async with tractor.trionics.maybe_open_context( # NOTE: all secondary tasks should cache hit on the same key acm_func=open_stream, - ) as (cache_hit, stream): - + ) as ( + cache_hit, + (an, stream) + ): if cache_hit: print(f'{taskname} loaded from cache') @@ -139,10 +154,32 @@ async def maybe_open_stream(taskname: str): # if this feed is already allocated by the first # task that entereed async with stream.subscribe() as bstream: - yield bstream + yield an, bstream + print( + f'cached task exited\n' + f')>\n' + f' |_{taskname}\n' + ) else: # yield the actual stream - yield stream + try: + yield an, stream + finally: + print( + f'NON-cached task exited\n' + f')>\n' + f' |_{taskname}\n' + ) + + bstream = stream._broadcaster + if ( + not bstream._state.subs + and + not cache_hit + ): + await tractor.pause(shield=True) + # await an.cancel() + def test_open_local_sub_to_stream( @@ -159,16 +196,24 @@ def test_open_local_sub_to_stream( if debug_mode: timeout = 999 + print(f'IN debug_mode, setting large timeout={timeout!r}..') async def main(): full = list(range(1000)) + an: tractor.ActorNursery|None = None + num_tasks: int = 10 async def get_sub_and_pull(taskname: str): + nonlocal an + stream: tractor.MsgStream async with ( - maybe_open_stream(taskname) as stream, + maybe_open_stream(taskname) as ( + an, + stream, + ), ): if '0' in taskname: assert isinstance(stream, tractor.MsgStream) @@ -180,34 +225,58 @@ def test_open_local_sub_to_stream( first = await stream.receive() print(f'{taskname} started with value {first}') - seq = [] + seq: list[int] = [] async for msg in stream: seq.append(msg) assert set(seq).issubset(set(full)) + + # end of @acm block print(f'{taskname} finished') + root: tractor.Actor with trio.fail_after(timeout) as cs: # TODO: turns out this isn't multi-task entrant XD # We probably need an indepotent entry semantic? async with tractor.open_root_actor( debug_mode=debug_mode, - ): + # maybe_enable_greenback=True, + # + # ^TODO? doesn't seem to mk breakpoint() usage work + # bc each bg task needs to open a portal?? + # - [ ] we should consider making this part of + # our taskman defaults? + # |_see https://github.com/goodboy/tractor/pull/363 + # + ) as root: + assert root.is_registrar + async with ( trio.open_nursery() as tn, ): - for i in range(10): + for i in range(num_tasks): tn.start_soon( get_sub_and_pull, f'task_{i}', ) await trio.sleep(0.001) - print('all consumer tasks finished') + print('all consumer tasks finished.') + + # ensure actor-nursery is shutdown or we might + # hang here..? + # if root.ipc_server.has_peers(): + # await tractor.pause() + # await an.cancel() + + # end of runtime scope + print('root actor terminated.') if cs.cancelled_caught: pytest.fail( 'Should NOT time out in `open_root_actor()` ?' ) + print('exiting main.') + trio.run(main) From 70664b98dee8ad8eb87a85d277e47684c2f7dba4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 21:59:42 -0400 Subject: [PATCH 3/6] Well then, I guess it just needed, a checkpoint XD Here I was thinking the bcaster (usage) maybe required a rework but, NOPE it's just bc a checkpoint was needed in the parent task owning the `tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated `an`/portal is eventually cancel-called.. Ah well, at least i started a patch for `MsgStream.subscribe()` to make it multicast revertible.. XD Anyway, I tossed in some checks & notes related to all that unnecessary effort since I do think i'll move forward implementing it: - for the `cache_hit` case always verify that the `bcast` clone is unregistered from the common state subs after `.subscribe().__aexit__()`. - do a light check that the implicit `MsgStream._broadcaster` is always the only bcrx instance left-leaked into that state.. that is until i get the proper de-allocation/reversion from multicast -> unicast working. - put in mega detailed note about the required parent-task checkpoint. --- tests/test_resource_cache.py | 65 +++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index e7958b9c..d54d76a9 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -120,13 +120,13 @@ async def open_stream() -> Awaitable[ print('Entered open_stream() caller') yield an, stream print('Exited open_stream() caller') - # await tractor.pause(shield=True) finally: - print('Cancelling streamer') - # await tractor.pause(shield=True) - with trio.CancelScope(shield=True): - await portal.cancel_actor() + print( + 'Cancelling streamer with,\n' + '=> `Portal.cancel_actor()`' + ) + await portal.cancel_actor() print('Cancelled streamer') except Exception as err: @@ -147,6 +147,8 @@ async def maybe_open_stream(taskname: str): cache_hit, (an, stream) ): + # when the actor + portal + ctx + stream has already been + # allocated we want to just bcast to this task. if cache_hit: print(f'{taskname} loaded from cache') @@ -160,6 +162,11 @@ async def maybe_open_stream(taskname: str): f')>\n' f' |_{taskname}\n' ) + + # we should always unreg the "cloned" bcrc for this + # consumer-task + assert id(bstream) not in bstream._state.subs + else: # yield the actual stream try: @@ -171,15 +178,21 @@ async def maybe_open_stream(taskname: str): f' |_{taskname}\n' ) - bstream = stream._broadcaster - if ( - not bstream._state.subs - and - not cache_hit - ): - await tractor.pause(shield=True) - # await an.cancel() - + first_bstream = stream._broadcaster + bcrx_state = first_bstream._state + subs: dict[int, int] = bcrx_state.subs + if len(subs) == 1: + assert id(first_bstream) in subs + # ^^TODO! the bcrx should always de-allocate all subs, + # including the implicit first one allocated on entry + # by the first subscribing peer task, no? + # + # -[ ] adjust `MsgStream.subscribe()` to do this mgmt! + # |_ allows reverting `MsgStream.receive()` to the + # non-bcaster method. + # |_ we can decide whether to reset `._broadcaster`? + # + # await tractor.pause(shield=True) def test_open_local_sub_to_stream( @@ -261,13 +274,25 @@ def test_open_local_sub_to_stream( ) await trio.sleep(0.001) - print('all consumer tasks finished.') + print('all consumer tasks finished!') - # ensure actor-nursery is shutdown or we might - # hang here..? - # if root.ipc_server.has_peers(): - # await tractor.pause() - # await an.cancel() + # ?XXX, ensure actor-nursery is shutdown or we might + # hang here due to a minor task deadlock/race-condition? + # + # - seems that all we need is a checkpoint to ensure + # the last suspended task, which is inside + # `.maybe_open_context()`, can do the + # `Portal.cancel_actor()` call? + # + # - if that bg task isn't resumed, then this blocks + # timeout might hit before that? + # + if root.ipc_server.has_peers(): + await trio.lowlevel.checkpoint() + + # alt approach, cancel the entire `an` + # await tractor.pause() + # await an.cancel() # end of runtime scope print('root actor terminated.') From f1ff79a4e6bd3b09b18328c7479c65a76ed59035 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 20 Jul 2025 13:26:25 -0400 Subject: [PATCH 4/6] Always `finally` invoke cache-miss `lock.release()`s Since the `await service_n.start()` on key-err can be cancel-masked (checkpoint interrupted before `_Cache.run_ctx` completes), we need to always `lock.release()` in to avoid lock-owner-state corruption and/or inf-hangs in peer cache-hitting tasks. Deats, - add a `try/except/finally` around the key-err triggered cache-miss `service_n.start(_Cache.run_ctx, ..)` call, reporting on any taskc and always `finally` unlocking. - fill out some log msg content and use `.debug()` level. --- tractor/trionics/_mngrs.py | 42 ++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 0f2e7746..339112d2 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -280,20 +280,44 @@ async def maybe_open_context( yielded = _Cache.values[ctx_key] except KeyError: - log.debug(f'Allocating new {acm_func} for {ctx_key}') + log.debug( + f'Allocating new @acm-func entry\n' + f'ctx_key={ctx_key}\n' + f'acm_func={acm_func}\n' + ) mngr = acm_func(**kwargs) resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' resources[ctx_key] = (service_n, trio.Event()) # sync up to the mngr's yielded value - yielded = await service_n.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 - lock.release() + try: + yielded: Any = await service_n.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + except trio.Cancelled as taskc: + log.cancel( + f'Cancelled during caching?\n' + f'\n' + f'ctx_key: {ctx_key!r}\n' + f'mngr: {mngr!r}\n' + ) + raise taskc + finally: + # XXX, since this runs from an `except` it's a checkpoint + # whih can be `trio.Cancelled`-masked. + # + # NOTE, in that case the mutex is never released by the + # (first and) caching task and **we can't** simply shield + # bc that will inf-block on the `await + # no_more_users.wait()`. + # + # SO just always unlock! + lock.release() + yield ( False, # cache_hit = "no" yielded, @@ -301,7 +325,7 @@ async def maybe_open_context( else: _Cache.users += 1 - log.runtime( + log.debug( f'Re-using cached resource for user {_Cache.users}\n\n' f'{ctx_key!r} -> {type(yielded)}\n' From 4ba3590450f382876de78942aacfe3399a202a63 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 20 Jul 2025 15:01:18 -0400 Subject: [PATCH 5/6] Add `.trionics.maybe_open_context()` locking test Call it `test_lock_not_corrupted_on_fast_cancel()` and includes a detailed doc string to explain. Implemented it "cleverly" by having the target `@acm` cancel its parent nursery after a peer, cache-hitting task, is already waiting on the task mutex release. --- tests/test_resource_cache.py | 105 ++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 3 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index d54d76a9..10eb3d84 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -1,5 +1,6 @@ ''' -Async context manager cache api testing: ``trionics.maybe_open_context():`` +Suites for our `.trionics.maybe_open_context()` multi-task +shared-cached `@acm` API. ''' from contextlib import asynccontextmanager as acm @@ -9,6 +10,15 @@ from typing import Awaitable import pytest import trio import tractor +from tractor.trionics import ( + maybe_open_context, +) +from tractor.log import ( + get_console_log, + get_logger, +) +log = get_logger(__name__) + _resource: int = 0 @@ -52,7 +62,7 @@ def test_resource_only_entered_once(key_on): # different task names per task will be used kwargs = {'task_name': name} - async with tractor.trionics.maybe_open_context( + async with maybe_open_context( maybe_increment_counter, kwargs=kwargs, key=key, @@ -140,7 +150,7 @@ async def open_stream() -> Awaitable[ @acm async def maybe_open_stream(taskname: str): - async with tractor.trionics.maybe_open_context( + async with maybe_open_context( # NOTE: all secondary tasks should cache hit on the same key acm_func=open_stream, ) as ( @@ -305,3 +315,92 @@ def test_open_local_sub_to_stream( print('exiting main.') trio.run(main) + + + +@acm +async def cancel_outer_cs( + cs: trio.CancelScope|None = None, + delay: float = 0, +): + # on first task delay this enough to block + # the 2nd task but then cancel it mid sleep + # so that the tn.start() inside the key-err handler block + # is cancelled and would previously corrupt the + # mutext state. + log.info(f'task entering sleep({delay})') + await trio.sleep(delay) + if cs: + log.info('task calling cs.cancel()') + cs.cancel() + trio.lowlevel.checkpoint() + yield + await trio.sleep_forever() + + +def test_lock_not_corrupted_on_fast_cancel( + debug_mode: bool, + loglevel: str, +): + ''' + Verify that if the caching-task (the first to enter + `maybe_open_context()`) is cancelled mid-cache-miss, the embedded + mutex can never be left in a corrupted state. + + That is, the lock is always eventually released ensuring a peer + (cache-hitting) task will never, + + - be left to inf-block/hang on the `lock.acquire()`. + - try to release the lock when still owned by the caching-task + due to it having erronously exited without calling + `lock.release()`. + + + ''' + delay: float = 1. + + async def use_moc( + cs: trio.CancelScope|None, + delay: float, + ): + log.info('task entering moc') + async with maybe_open_context( + cancel_outer_cs, + kwargs={ + 'cs': cs, + 'delay': delay, + }, + ) as (cache_hit, _null): + if cache_hit: + log.info('2nd task entered') + else: + log.info('1st task entered') + + await trio.sleep_forever() + + async def main(): + with trio.fail_after(delay + 2): + async with ( + tractor.open_root_actor( + debug_mode=debug_mode, + loglevel=loglevel, + ), + trio.open_nursery() as tn, + ): + get_console_log('info') + log.info('yo starting') + cs = tn.cancel_scope + tn.start_soon( + use_moc, + cs, + delay, + name='child', + ) + with trio.CancelScope() as rent_cs: + await use_moc( + cs=rent_cs, + delay=delay, + ) + + + trio.run(main) From 4e252526b5cafc8cdba5dc8d6c3c958dae5713ef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Jul 2025 20:10:24 -0400 Subject: [PATCH 6/6] Accept `tn` to `gather_contexts()/maybe_open_context()` Such that the caller can be responsible for their own (nursery) scoping as needed and, for the latter fn's case with a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is a valid parent-ish. Some deats, - in `gather_contexts()`, mv the `try/finally` outside the nursery block to ensure we always do the `parent_exit`. - for `maybe_open_context()` we do a naive task-tree hierarchy audit to ensure the provided scope is not *too* child-ish (with what APIs `trio` gives us, see above), OW go with the old approach of using the actor's private service nursery. Also, * better report `trio.Cancelled` around the cache-miss `yield` cases and ensure we **never** unmask triggering key-errors. * report on any stale-state with the mutex in the `finally` block. --- tractor/trionics/_mngrs.py | 153 ++++++++++++++++++++++++------------- 1 file changed, 101 insertions(+), 52 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 339112d2..99f05852 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -41,6 +41,9 @@ import trio from tractor._state import current_actor from tractor.log import get_logger # from ._beg import collapse_eg +# from ._taskc import ( +# maybe_raise_from_masking_exc, +# ) if TYPE_CHECKING: @@ -106,6 +109,9 @@ async def _enter_and_wait( async def gather_contexts( mngrs: Sequence[AsyncContextManager[T]], + # caller can provide their own scope + tn: trio.Nursery|None = None, + ) -> AsyncGenerator[ tuple[ T | None, @@ -148,39 +154,45 @@ async def gather_contexts( '`.trionics.gather_contexts()` input mngrs is empty?\n' '\n' 'Did try to use inline generator syntax?\n' - 'Use a non-lazy iterator or sequence-type intead!\n' + 'Check that list({mngrs}) works!\n' + # 'or sequence-type intead!\n' + # 'Use a non-lazy iterator or sequence-type intead!\n' ) - async with ( - # collapse_eg(), - trio.open_nursery( - strict_exception_groups=False, - # ^XXX^ TODO? soo roll our own then ?? - # -> since we kinda want the "if only one `.exception` then - # just raise that" interface? - ) as tn, - ): - for mngr in mngrs: - tn.start_soon( - _enter_and_wait, - mngr, - unwrapped, - all_entered, - parent_exit, - seed, - ) + try: + async with ( + # + # ?TODO, does including these (eg-collapsing, + # taskc-unmasking) improve tb noise-reduction/legibility? + # + # collapse_eg(), + maybe_open_nursery( + nursery=tn, + ) as tn, + # maybe_raise_from_masking_exc(), + ): + for mngr in mngrs: + tn.start_soon( + _enter_and_wait, + mngr, + unwrapped, + all_entered, + parent_exit, + seed, + ) - # deliver control once all managers have started up - await all_entered.wait() - - try: + # deliver control to caller once all ctx-managers have + # started (yielded back to us). + await all_entered.wait() yield tuple(unwrapped.values()) - finally: - # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid - # the following wacky bug: - # parent_exit.set() + finally: + # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid + # the following wacky bug: + # + parent_exit.set() + # Per actor task caching helpers. # Further potential examples of interest: @@ -233,6 +245,9 @@ async def maybe_open_context( kwargs: dict = {}, key: Hashable | Callable[..., Hashable] = None, + # caller can provide their own scope + tn: trio.Nursery|None = None, + ) -> AsyncIterator[tuple[bool, T]]: ''' Maybe open an async-context-manager (acm) if there is not already @@ -265,7 +280,23 @@ async def maybe_open_context( # have it not be closed until all consumers have exited (which is # currently difficult to implement any other way besides using our # pre-allocated runtime instance..) - service_n: trio.Nursery = current_actor()._service_n + if tn: + # TODO, assert tn is eventual parent of this task! + task: trio.Task = trio.lowlevel.current_task() + task_tn: trio.Nursery = task.parent_nursery + if not tn._cancel_status.encloses( + task_tn._cancel_status + ): + raise RuntimeError( + f'Mis-nesting of task under provided {tn} !?\n' + f'Current task is NOT a child(-ish)!!\n' + f'\n' + f'task: {task}\n' + f'task_tn: {task_tn}\n' + ) + service_n = tn + else: + service_n: trio.Nursery = current_actor()._service_n # TODO: is there any way to allocate # a 'stays-open-till-last-task-finshed nursery? @@ -273,39 +304,33 @@ async def maybe_open_context( # async with maybe_open_nursery(_Cache.service_n) as service_n: # _Cache.service_n = service_n + cache_miss_ke: KeyError|None = None + maybe_taskc: trio.Cancelled|None = None try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. yielded = _Cache.values[ctx_key] - except KeyError: - log.debug( - f'Allocating new @acm-func entry\n' - f'ctx_key={ctx_key}\n' - f'acm_func={acm_func}\n' - ) - mngr = acm_func(**kwargs) - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - - # sync up to the mngr's yielded value + except KeyError as _ke: + # XXX, stay mutexed up to cache-miss yield try: + cache_miss_ke = _ke + log.debug( + f'Allocating new @acm-func entry\n' + f'ctx_key={ctx_key}\n' + f'acm_func={acm_func}\n' + ) + mngr = acm_func(**kwargs) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) yielded: Any = await service_n.start( _Cache.run_ctx, mngr, ctx_key, ) _Cache.users += 1 - except trio.Cancelled as taskc: - log.cancel( - f'Cancelled during caching?\n' - f'\n' - f'ctx_key: {ctx_key!r}\n' - f'mngr: {mngr!r}\n' - ) - raise taskc finally: # XXX, since this runs from an `except` it's a checkpoint # whih can be `trio.Cancelled`-masked. @@ -318,10 +343,27 @@ async def maybe_open_context( # SO just always unlock! lock.release() - yield ( - False, # cache_hit = "no" - yielded, - ) + try: + yield ( + False, # cache_hit = "no" + yielded, + ) + except trio.Cancelled as taskc: + maybe_taskc = taskc + log.cancel( + f'Cancelled from cache-miss entry\n' + f'\n' + f'ctx_key: {ctx_key!r}\n' + f'mngr: {mngr!r}\n' + ) + # XXX, always unset ke from cancelled context + # since we never consider it a masked exc case! + # - bc this can be called directly ty `._rpc._invoke()`? + # + if maybe_taskc.__context__ is cache_miss_ke: + maybe_taskc.__context__ = None + + raise taskc else: _Cache.users += 1 @@ -341,6 +383,13 @@ async def maybe_open_context( ) finally: + if lock.locked(): + stats: trio.LockStatistics = lock.statistics() + log.error( + f'Lock left locked by last owner !?\n' + f'{stats}\n' + ) + _Cache.users -= 1 if yielded is not None: