Merge pull request #393 from goodboy/trionics_tweaks

Trionics tweaks: some `._mngrs` refinements and fix a `test_resource_cache` hang
main
Bd 2025-08-18 21:20:33 -04:00 committed by GitHub
commit dd041b0a01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 342 additions and 70 deletions

View File

@ -1,5 +1,6 @@
'''
Async context manager cache api testing: ``trionics.maybe_open_context():``
Suites for our `.trionics.maybe_open_context()` multi-task
shared-cached `@acm` API.
'''
from contextlib import asynccontextmanager as acm
@ -9,6 +10,15 @@ from typing import Awaitable
import pytest
import trio
import tractor
from tractor.trionics import (
maybe_open_context,
)
from tractor.log import (
get_console_log,
get_logger,
)
log = get_logger(__name__)
_resource: int = 0
@ -52,7 +62,7 @@ def test_resource_only_entered_once(key_on):
# different task names per task will be used
kwargs = {'task_name': name}
async with tractor.trionics.maybe_open_context(
async with maybe_open_context(
maybe_increment_counter,
kwargs=kwargs,
key=key,
@ -72,11 +82,13 @@ def test_resource_only_entered_once(key_on):
with trio.move_on_after(0.5):
async with (
tractor.open_root_actor(),
trio.open_nursery() as n,
trio.open_nursery() as tn,
):
for i in range(10):
n.start_soon(enter_cached_mngr, f'task_{i}')
tn.start_soon(
enter_cached_mngr,
f'task_{i}',
)
await trio.sleep(0.001)
trio.run(main)
@ -98,21 +110,32 @@ async def streamer(
@acm
async def open_stream() -> Awaitable[tractor.MsgStream]:
async def open_stream() -> Awaitable[
tuple[
tractor.ActorNursery,
tractor.MsgStream,
]
]:
try:
async with tractor.open_nursery() as an:
portal = await an.start_actor(
'streamer',
enable_modules=[__name__],
)
try:
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
print('Entered open_stream() caller')
yield an, stream
print('Exited open_stream() caller')
print('Cancelling streamer')
finally:
print(
'Cancelling streamer with,\n'
'=> `Portal.cancel_actor()`'
)
await portal.cancel_actor()
print('Cancelled streamer')
@ -127,11 +150,15 @@ async def open_stream() -> Awaitable[tractor.MsgStream]:
@acm
async def maybe_open_stream(taskname: str):
async with tractor.trionics.maybe_open_context(
async with maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream,
) as (cache_hit, stream):
) as (
cache_hit,
(an, stream)
):
# when the actor + portal + ctx + stream has already been
# allocated we want to just bcast to this task.
if cache_hit:
print(f'{taskname} loaded from cache')
@ -139,10 +166,43 @@ async def maybe_open_stream(taskname: str):
# if this feed is already allocated by the first
# task that entereed
async with stream.subscribe() as bstream:
yield bstream
yield an, bstream
print(
f'cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
# we should always unreg the "cloned" bcrc for this
# consumer-task
assert id(bstream) not in bstream._state.subs
else:
# yield the actual stream
yield stream
try:
yield an, stream
finally:
print(
f'NON-cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
first_bstream = stream._broadcaster
bcrx_state = first_bstream._state
subs: dict[int, int] = bcrx_state.subs
if len(subs) == 1:
assert id(first_bstream) in subs
# ^^TODO! the bcrx should always de-allocate all subs,
# including the implicit first one allocated on entry
# by the first subscribing peer task, no?
#
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
# |_ allows reverting `MsgStream.receive()` to the
# non-bcaster method.
# |_ we can decide whether to reset `._broadcaster`?
#
# await tractor.pause(shield=True)
def test_open_local_sub_to_stream(
@ -159,16 +219,24 @@ def test_open_local_sub_to_stream(
if debug_mode:
timeout = 999
print(f'IN debug_mode, setting large timeout={timeout!r}..')
async def main():
full = list(range(1000))
an: tractor.ActorNursery|None = None
num_tasks: int = 10
async def get_sub_and_pull(taskname: str):
nonlocal an
stream: tractor.MsgStream
async with (
maybe_open_stream(taskname) as stream,
maybe_open_stream(taskname) as (
an,
stream,
),
):
if '0' in taskname:
assert isinstance(stream, tractor.MsgStream)
@ -180,34 +248,159 @@ def test_open_local_sub_to_stream(
first = await stream.receive()
print(f'{taskname} started with value {first}')
seq = []
seq: list[int] = []
async for msg in stream:
seq.append(msg)
assert set(seq).issubset(set(full))
# end of @acm block
print(f'{taskname} finished')
root: tractor.Actor
with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic?
async with tractor.open_root_actor(
debug_mode=debug_mode,
):
# maybe_enable_greenback=True,
#
# ^TODO? doesn't seem to mk breakpoint() usage work
# bc each bg task needs to open a portal??
# - [ ] we should consider making this part of
# our taskman defaults?
# |_see https://github.com/goodboy/tractor/pull/363
#
) as root:
assert root.is_registrar
async with (
trio.open_nursery() as tn,
):
for i in range(10):
for i in range(num_tasks):
tn.start_soon(
get_sub_and_pull,
f'task_{i}',
)
await trio.sleep(0.001)
print('all consumer tasks finished')
print('all consumer tasks finished!')
# ?XXX, ensure actor-nursery is shutdown or we might
# hang here due to a minor task deadlock/race-condition?
#
# - seems that all we need is a checkpoint to ensure
# the last suspended task, which is inside
# `.maybe_open_context()`, can do the
# `Portal.cancel_actor()` call?
#
# - if that bg task isn't resumed, then this blocks
# timeout might hit before that?
#
if root.ipc_server.has_peers():
await trio.lowlevel.checkpoint()
# alt approach, cancel the entire `an`
# await tractor.pause()
# await an.cancel()
# end of runtime scope
print('root actor terminated.')
if cs.cancelled_caught:
pytest.fail(
'Should NOT time out in `open_root_actor()` ?'
)
print('exiting main.')
trio.run(main)
@acm
async def cancel_outer_cs(
cs: trio.CancelScope|None = None,
delay: float = 0,
):
# on first task delay this enough to block
# the 2nd task but then cancel it mid sleep
# so that the tn.start() inside the key-err handler block
# is cancelled and would previously corrupt the
# mutext state.
log.info(f'task entering sleep({delay})')
await trio.sleep(delay)
if cs:
log.info('task calling cs.cancel()')
cs.cancel()
trio.lowlevel.checkpoint()
yield
await trio.sleep_forever()
def test_lock_not_corrupted_on_fast_cancel(
debug_mode: bool,
loglevel: str,
):
'''
Verify that if the caching-task (the first to enter
`maybe_open_context()`) is cancelled mid-cache-miss, the embedded
mutex can never be left in a corrupted state.
That is, the lock is always eventually released ensuring a peer
(cache-hitting) task will never,
- be left to inf-block/hang on the `lock.acquire()`.
- try to release the lock when still owned by the caching-task
due to it having erronously exited without calling
`lock.release()`.
'''
delay: float = 1.
async def use_moc(
cs: trio.CancelScope|None,
delay: float,
):
log.info('task entering moc')
async with maybe_open_context(
cancel_outer_cs,
kwargs={
'cs': cs,
'delay': delay,
},
) as (cache_hit, _null):
if cache_hit:
log.info('2nd task entered')
else:
log.info('1st task entered')
await trio.sleep_forever()
async def main():
with trio.fail_after(delay + 2):
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
trio.open_nursery() as tn,
):
get_console_log('info')
log.info('yo starting')
cs = tn.cancel_scope
tn.start_soon(
use_moc,
cs,
delay,
name='child',
)
with trio.CancelScope() as rent_cs:
await use_moc(
cs=rent_cs,
delay=delay,
)
trio.run(main)

View File

@ -41,6 +41,9 @@ import trio
from tractor._state import current_actor
from tractor.log import get_logger
# from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING:
@ -106,6 +109,9 @@ async def _enter_and_wait(
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[
tuple[
T | None,
@ -148,17 +154,22 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence-type intead!\n'
'Check that list({mngrs}) works!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
)
try:
async with (
#
# ?TODO, does including these (eg-collapsing,
# taskc-unmasking) improve tb noise-reduction/legibility?
#
# collapse_eg(),
trio.open_nursery(
strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
maybe_open_nursery(
nursery=tn,
) as tn,
# maybe_raise_from_masking_exc(),
):
for mngr in mngrs:
tn.start_soon(
@ -170,11 +181,12 @@ async def gather_contexts(
seed,
)
# deliver control once all managers have started up
# deliver control to caller once all ctx-managers have
# started (yielded back to us).
await all_entered.wait()
try:
yield tuple(unwrapped.values())
parent_exit.set()
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
@ -233,6 +245,9 @@ async def maybe_open_context(
kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open an async-context-manager (acm) if there is not already
@ -265,6 +280,22 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our
# pre-allocated runtime instance..)
if tn:
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_n = tn
else:
service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate
@ -273,32 +304,70 @@ async def maybe_open_context(
# async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try:
# **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key]
except KeyError:
log.debug(f'Allocating new {acm_func} for {ctx_key}')
except KeyError as _ke:
# XXX, stay mutexed up to cache-miss yield
try:
cache_miss_ke = _ke
log.debug(
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
# sync up to the mngr's yielded value
yielded = await service_n.start(
yielded: Any = await service_n.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
finally:
# XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked.
#
# NOTE, in that case the mutex is never released by the
# (first and) caching task and **we can't** simply shield
# bc that will inf-block on the `await
# no_more_users.wait()`.
#
# SO just always unlock!
lock.release()
yield False, yielded
try:
yield (
False, # cache_hit = "no"
yielded,
)
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else:
_Cache.users += 1
log.runtime(
log.debug(
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
@ -308,9 +377,19 @@ async def maybe_open_context(
# f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield True, yielded
yield (
True, # cache_hit = "yes"
yielded,
)
finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1
if yielded is not None: