Accept `tn` to `gather_contexts()/maybe_open_context()`

Such that the caller can be responsible for their own (nursery) scoping
as needed and, for the latter fn's case with
a `trio.Nursery.CancelStatus.encloses()` check to ensure the `tn` is
a valid parent-ish.

Some deats,
- in `gather_contexts()`, mv the `try/finally` outside the nursery block
  to ensure we always do the `parent_exit`.
- for `maybe_open_context()` we do a naive task-tree hierarchy audit to
  ensure the provided scope is not *too* child-ish (with what APIs `trio`
  gives us, see above), OW go with the old approach of using the actor's
  private service nursery.
  Also,
  * better report `trio.Cancelled` around the cache-miss `yield`
    cases and ensure we **never** unmask triggering key-errors.
  * report on any stale-state with the mutex in the `finally` block.
to_asyncio_eoc_signal
Tyler Goodlet 2025-07-26 20:10:24 -04:00
parent fc77e6eca5
commit 7459a4127c
1 changed files with 101 additions and 52 deletions

View File

@ -41,6 +41,9 @@ import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg # from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -106,6 +109,9 @@ async def _enter_and_wait(
async def gather_contexts( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
tuple[ tuple[
T | None, T | None,
@ -148,39 +154,45 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n' '\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence-type intead!\n' 'Check that list({mngrs}) works!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
) )
async with ( try:
# collapse_eg(), async with (
trio.open_nursery( #
strict_exception_groups=False, # ?TODO, does including these (eg-collapsing,
# ^XXX^ TODO? soo roll our own then ?? # taskc-unmasking) improve tb noise-reduction/legibility?
# -> since we kinda want the "if only one `.exception` then #
# just raise that" interface? # collapse_eg(),
) as tn, maybe_open_nursery(
): nursery=tn,
for mngr in mngrs: ) as tn,
tn.start_soon( # maybe_raise_from_masking_exc(),
_enter_and_wait, ):
mngr, for mngr in mngrs:
unwrapped, tn.start_soon(
all_entered, _enter_and_wait,
parent_exit, mngr,
seed, unwrapped,
) all_entered,
parent_exit,
seed,
)
# deliver control once all managers have started up # deliver control to caller once all ctx-managers have
await all_entered.wait() # started (yielded back to us).
await all_entered.wait()
try:
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set() parent_exit.set()
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()
# Per actor task caching helpers. # Per actor task caching helpers.
# Further potential examples of interest: # Further potential examples of interest:
@ -233,6 +245,9 @@ async def maybe_open_context(
kwargs: dict = {}, kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None, key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
Maybe open an async-context-manager (acm) if there is not already Maybe open an async-context-manager (acm) if there is not already
@ -265,7 +280,23 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is # have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our # currently difficult to implement any other way besides using our
# pre-allocated runtime instance..) # pre-allocated runtime instance..)
service_n: trio.Nursery = current_actor()._service_n if tn:
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_n = tn
else:
service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate # TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery? # a 'stays-open-till-last-task-finshed nursery?
@ -273,39 +304,33 @@ async def maybe_open_context(
# async with maybe_open_nursery(_Cache.service_n) as service_n: # async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n # _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try: try:
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler # checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource. # may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key] yielded = _Cache.values[ctx_key]
except KeyError: except KeyError as _ke:
log.debug( # XXX, stay mutexed up to cache-miss yield
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
# sync up to the mngr's yielded value
try: try:
cache_miss_ke = _ke
log.debug(
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
yielded: Any = await service_n.start( yielded: Any = await service_n.start(
_Cache.run_ctx, _Cache.run_ctx,
mngr, mngr,
ctx_key, ctx_key,
) )
_Cache.users += 1 _Cache.users += 1
except trio.Cancelled as taskc:
log.cancel(
f'Cancelled during caching?\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
raise taskc
finally: finally:
# XXX, since this runs from an `except` it's a checkpoint # XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked. # whih can be `trio.Cancelled`-masked.
@ -318,10 +343,27 @@ async def maybe_open_context(
# SO just always unlock! # SO just always unlock!
lock.release() lock.release()
yield ( try:
False, # cache_hit = "no" yield (
yielded, False, # cache_hit = "no"
) yielded,
)
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else: else:
_Cache.users += 1 _Cache.users += 1
@ -341,6 +383,13 @@ async def maybe_open_context(
) )
finally: finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1 _Cache.users -= 1
if yielded is not None: if yielded is not None: