diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 9524ffe1..34e62a7c 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics ''' from __future__ import annotations +from collections import defaultdict from contextlib import ( asynccontextmanager as acm, ) @@ -39,6 +40,7 @@ from typing import ( import trio from tractor.runtime._state import current_actor from tractor.log import get_logger +import tractor # from ._beg import collapse_eg # from ._taskc import ( # maybe_raise_from_masking_exc, @@ -135,7 +137,7 @@ async def gather_contexts( ''' seed: int = id(mngrs) - unwrapped: dict[int, T | None] = {}.fromkeys( + unwrapped: dict[int, T|None] = {}.fromkeys( (id(mngr) for mngr in mngrs), seed, ) @@ -205,7 +207,10 @@ class _Cache: ''' service_tn: trio.Nursery|None = None locks: dict[Hashable, trio.Lock] = {} - users: int = 0 + users: defaultdict[ + tuple|Hashable, + int, + ] = defaultdict(int) values: dict[Any, Any] = {} resources: dict[ Hashable, @@ -233,18 +238,32 @@ class _Cache: value = cls.values.pop(ctx_key) finally: # discard nursery ref so it won't be re-used (an error)? - cls.resources.pop(ctx_key) + _rsrcs = cls.resources.pop(ctx_key) + log.error( + f'Popping ctx resources\n' + f'{_rsrcs}\n' + ) + + +class _UnresolvedCtx: + ''' + Placeholder for the mabye-value delivered from some `acm_func`, + once (first) entered by a `maybe_open_context()` task. + + Enables internal teardown logic conditioned on whether the + context was actually entered successfully vs. cancelled prior. + + ''' @acm async def maybe_open_context( - acm_func: Callable[..., AsyncContextManager[T]], # XXX: used as cache key after conversion to tuple # and all embedded values must also be hashable kwargs: dict = {}, - key: Hashable | Callable[..., Hashable] = None, + key: Hashable|Callable[..., Hashable] = None, # caller can provide their own scope tn: trio.Nursery|None = None, @@ -257,25 +276,59 @@ async def maybe_open_context( Return the `_Cached` instance on a _Cache hit. ''' - fid = id(acm_func) - + fid: int = id(acm_func) if inspect.isfunction(key): - ctx_key = (fid, key(**kwargs)) + ctx_key = ( + fid, + key(**kwargs) + ) else: - ctx_key = (fid, key or tuple(kwargs.items())) + ctx_key = ( + fid, + key or tuple(kwargs.items()) + ) # yielded output - yielded: Any = None + # sentinel = object() + yielded: Any = _UnresolvedCtx lock_registered: bool = False # Lock resource acquisition around task racing / ``trio``'s # scheduler protocol. # NOTE: the lock is target context manager func specific in order # to allow re-entrant use cases where one `maybe_open_context()` - # wrapped factor may want to call into another. - lock = _Cache.locks.setdefault(fid, trio.Lock()) - lock_registered: bool = True + # wrapped factory may want to call into another. + task: trio.Task = trio.lowlevel.current_task() + lock: trio.StrictFIFOLock|None = _Cache.locks.get( + # fid + ctx_key + ) + if not lock: + lock = _Cache.locks[ + ctx_key + # fid + ] = trio.StrictFIFOLock() + # lock = _Cache.locks[fid] = trio.Lock() + header: str = 'Allocated NEW lock for @acm_func,\n' + lock_registered: bool = True + else: + await trio.lowlevel.checkpoint() + header: str = 'Reusing OLD lock for @acm_func,\n' + + log.debug( + f'{header}' + f'Acquiring..\n' + f'task={task!r}\n' + f'fid={fid!r}\n' + f'acm_func={acm_func}\n' + ) await lock.acquire() + log.debug( + f'Acquir lock..\n' + f'task={task!r}\n' + f'fid={fid!r}\n' + f'acm_func={acm_func}\n' + ) # XXX: one singleton nursery per actor and we want to # have it not be closed until all consumers have exited (which is @@ -312,6 +365,7 @@ async def maybe_open_context( # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. yielded = _Cache.values[ctx_key] + # XXX^ should key-err if not-yet-allocated except KeyError as _ke: # XXX, stay mutexed up to cache-miss yield @@ -322,19 +376,31 @@ async def maybe_open_context( f'ctx_key={ctx_key}\n' f'acm_func={acm_func}\n' ) + # await tractor.pause() mngr = acm_func(**kwargs) resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + entry: tuple|None = resources.get(ctx_key) + if entry: + service_tn, ev = entry + # XXX, trace this. + # await tractor.pause(shield=True) + raise RuntimeError( + f'Caching resources ALREADY exist?!\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func}\n' + f'task: {task}\n' + ) + resources[ctx_key] = (service_tn, trio.Event()) yielded: Any = await service_tn.start( _Cache.run_ctx, mngr, ctx_key, ) - _Cache.users += 1 + _Cache.users[ctx_key] += 1 finally: # XXX, since this runs from an `except` it's a checkpoint - # whih can be `trio.Cancelled`-masked. + # which can be `trio.Cancelled`-masked. # # NOTE, in that case the mutex is never released by the # (first and) caching task and **we can't** simply shield @@ -365,9 +431,9 @@ async def maybe_open_context( maybe_taskc.__context__ = None raise taskc - else: - _Cache.users += 1 + # XXX, cached-entry-path + _Cache.users[ctx_key] += 1 log.debug( f'Re-using cached resource for user {_Cache.users}\n\n' f'{ctx_key!r} -> {type(yielded)}\n' @@ -386,17 +452,29 @@ async def maybe_open_context( finally: if lock.locked(): stats: trio.LockStatistics = lock.statistics() + owner: trio.Task|None = stats.owner log.error( - f'Lock left locked by last owner !?\n' + f'Lock never released by last owner={owner!r} !?\n' f'{stats}\n' + f'\n' + f'task={task!r}\n' + f'fid={fid!r}\n' + f'acm_func={acm_func}\n' + ) + # XXX, trace it. + # await tractor.pause(shield=True) - _Cache.users -= 1 + _Cache.users[ctx_key] -= 1 - if yielded is not None: + if yielded is not _UnresolvedCtx: # if no more consumers, teardown the client - if _Cache.users <= 0: - log.debug(f'De-allocating resource for {ctx_key}') + if _Cache.users[ctx_key] <= 0: + log.debug( + f'De-allocating @acm-func entry\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func!r}\n' + ) # XXX: if we're cancelled we the entry may have never # been entered since the nursery task was killed. @@ -407,8 +485,11 @@ async def maybe_open_context( no_more_users.set() if lock_registered: - maybe_lock = _Cache.locks.pop(fid, None) + maybe_lock = _Cache.locks.pop( + ctx_key, + None, + ) if maybe_lock is None: log.error( - f'Resource lock for {fid} ALREADY POPPED?' + f'Resource lock for {ctx_key} ALREADY POPPED?' )