diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 339112d2..99f05852 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -41,6 +41,9 @@ import trio from tractor._state import current_actor from tractor.log import get_logger # from ._beg import collapse_eg +# from ._taskc import ( +# maybe_raise_from_masking_exc, +# ) if TYPE_CHECKING: @@ -106,6 +109,9 @@ async def _enter_and_wait( async def gather_contexts( mngrs: Sequence[AsyncContextManager[T]], + # caller can provide their own scope + tn: trio.Nursery|None = None, + ) -> AsyncGenerator[ tuple[ T | None, @@ -148,39 +154,45 @@ async def gather_contexts( '`.trionics.gather_contexts()` input mngrs is empty?\n' '\n' 'Did try to use inline generator syntax?\n' - 'Use a non-lazy iterator or sequence-type intead!\n' + 'Check that list({mngrs}) works!\n' + # 'or sequence-type intead!\n' + # 'Use a non-lazy iterator or sequence-type intead!\n' ) - async with ( - # collapse_eg(), - trio.open_nursery( - strict_exception_groups=False, - # ^XXX^ TODO? soo roll our own then ?? - # -> since we kinda want the "if only one `.exception` then - # just raise that" interface? - ) as tn, - ): - for mngr in mngrs: - tn.start_soon( - _enter_and_wait, - mngr, - unwrapped, - all_entered, - parent_exit, - seed, - ) + try: + async with ( + # + # ?TODO, does including these (eg-collapsing, + # taskc-unmasking) improve tb noise-reduction/legibility? + # + # collapse_eg(), + maybe_open_nursery( + nursery=tn, + ) as tn, + # maybe_raise_from_masking_exc(), + ): + for mngr in mngrs: + tn.start_soon( + _enter_and_wait, + mngr, + unwrapped, + all_entered, + parent_exit, + seed, + ) - # deliver control once all managers have started up - await all_entered.wait() - - try: + # deliver control to caller once all ctx-managers have + # started (yielded back to us). + await all_entered.wait() yield tuple(unwrapped.values()) - finally: - # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid - # the following wacky bug: - # parent_exit.set() + finally: + # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid + # the following wacky bug: + # + parent_exit.set() + # Per actor task caching helpers. # Further potential examples of interest: @@ -233,6 +245,9 @@ async def maybe_open_context( kwargs: dict = {}, key: Hashable | Callable[..., Hashable] = None, + # caller can provide their own scope + tn: trio.Nursery|None = None, + ) -> AsyncIterator[tuple[bool, T]]: ''' Maybe open an async-context-manager (acm) if there is not already @@ -265,7 +280,23 @@ async def maybe_open_context( # have it not be closed until all consumers have exited (which is # currently difficult to implement any other way besides using our # pre-allocated runtime instance..) - service_n: trio.Nursery = current_actor()._service_n + if tn: + # TODO, assert tn is eventual parent of this task! + task: trio.Task = trio.lowlevel.current_task() + task_tn: trio.Nursery = task.parent_nursery + if not tn._cancel_status.encloses( + task_tn._cancel_status + ): + raise RuntimeError( + f'Mis-nesting of task under provided {tn} !?\n' + f'Current task is NOT a child(-ish)!!\n' + f'\n' + f'task: {task}\n' + f'task_tn: {task_tn}\n' + ) + service_n = tn + else: + service_n: trio.Nursery = current_actor()._service_n # TODO: is there any way to allocate # a 'stays-open-till-last-task-finshed nursery? @@ -273,39 +304,33 @@ async def maybe_open_context( # async with maybe_open_nursery(_Cache.service_n) as service_n: # _Cache.service_n = service_n + cache_miss_ke: KeyError|None = None + maybe_taskc: trio.Cancelled|None = None try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. yielded = _Cache.values[ctx_key] - except KeyError: - log.debug( - f'Allocating new @acm-func entry\n' - f'ctx_key={ctx_key}\n' - f'acm_func={acm_func}\n' - ) - mngr = acm_func(**kwargs) - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - - # sync up to the mngr's yielded value + except KeyError as _ke: + # XXX, stay mutexed up to cache-miss yield try: + cache_miss_ke = _ke + log.debug( + f'Allocating new @acm-func entry\n' + f'ctx_key={ctx_key}\n' + f'acm_func={acm_func}\n' + ) + mngr = acm_func(**kwargs) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) yielded: Any = await service_n.start( _Cache.run_ctx, mngr, ctx_key, ) _Cache.users += 1 - except trio.Cancelled as taskc: - log.cancel( - f'Cancelled during caching?\n' - f'\n' - f'ctx_key: {ctx_key!r}\n' - f'mngr: {mngr!r}\n' - ) - raise taskc finally: # XXX, since this runs from an `except` it's a checkpoint # whih can be `trio.Cancelled`-masked. @@ -318,10 +343,27 @@ async def maybe_open_context( # SO just always unlock! lock.release() - yield ( - False, # cache_hit = "no" - yielded, - ) + try: + yield ( + False, # cache_hit = "no" + yielded, + ) + except trio.Cancelled as taskc: + maybe_taskc = taskc + log.cancel( + f'Cancelled from cache-miss entry\n' + f'\n' + f'ctx_key: {ctx_key!r}\n' + f'mngr: {mngr!r}\n' + ) + # XXX, always unset ke from cancelled context + # since we never consider it a masked exc case! + # - bc this can be called directly ty `._rpc._invoke()`? + # + if maybe_taskc.__context__ is cache_miss_ke: + maybe_taskc.__context__ = None + + raise taskc else: _Cache.users += 1 @@ -341,6 +383,13 @@ async def maybe_open_context( ) finally: + if lock.locked(): + stats: trio.LockStatistics = lock.statistics() + log.error( + f'Lock left locked by last owner !?\n' + f'{stats}\n' + ) + _Cache.users -= 1 if yielded is not None: