Add `.trionics.maybe_open_context()` locking test
Call it `test_lock_not_corrupted_on_fast_cancel()` and includes a detailed doc string to explain. Implemented it "cleverly" by having the target `@acm` cancel its parent nursery after a peer, cache-hitting task, is already waiting on the task mutex release.to_asyncio_eoc_signal
parent
33ac3ca99f
commit
11c4e65757
|
@ -1,5 +1,6 @@
|
||||||
'''
|
'''
|
||||||
Async context manager cache api testing: ``trionics.maybe_open_context():``
|
Suites for our `.trionics.maybe_open_context()` multi-task
|
||||||
|
shared-cached `@acm` API.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
@ -9,6 +10,15 @@ from typing import Awaitable
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor.trionics import (
|
||||||
|
maybe_open_context,
|
||||||
|
)
|
||||||
|
from tractor.log import (
|
||||||
|
get_console_log,
|
||||||
|
get_logger,
|
||||||
|
)
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
_resource: int = 0
|
_resource: int = 0
|
||||||
|
@ -52,7 +62,7 @@ def test_resource_only_entered_once(key_on):
|
||||||
# different task names per task will be used
|
# different task names per task will be used
|
||||||
kwargs = {'task_name': name}
|
kwargs = {'task_name': name}
|
||||||
|
|
||||||
async with tractor.trionics.maybe_open_context(
|
async with maybe_open_context(
|
||||||
maybe_increment_counter,
|
maybe_increment_counter,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
key=key,
|
key=key,
|
||||||
|
@ -140,7 +150,7 @@ async def open_stream() -> Awaitable[
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_stream(taskname: str):
|
async def maybe_open_stream(taskname: str):
|
||||||
async with tractor.trionics.maybe_open_context(
|
async with maybe_open_context(
|
||||||
# NOTE: all secondary tasks should cache hit on the same key
|
# NOTE: all secondary tasks should cache hit on the same key
|
||||||
acm_func=open_stream,
|
acm_func=open_stream,
|
||||||
) as (
|
) as (
|
||||||
|
@ -305,3 +315,92 @@ def test_open_local_sub_to_stream(
|
||||||
print('exiting main.')
|
print('exiting main.')
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def cancel_outer_cs(
|
||||||
|
cs: trio.CancelScope|None = None,
|
||||||
|
delay: float = 0,
|
||||||
|
):
|
||||||
|
# on first task delay this enough to block
|
||||||
|
# the 2nd task but then cancel it mid sleep
|
||||||
|
# so that the tn.start() inside the key-err handler block
|
||||||
|
# is cancelled and would previously corrupt the
|
||||||
|
# mutext state.
|
||||||
|
log.info(f'task entering sleep({delay})')
|
||||||
|
await trio.sleep(delay)
|
||||||
|
if cs:
|
||||||
|
log.info('task calling cs.cancel()')
|
||||||
|
cs.cancel()
|
||||||
|
trio.lowlevel.checkpoint()
|
||||||
|
yield
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
def test_lock_not_corrupted_on_fast_cancel(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that if the caching-task (the first to enter
|
||||||
|
`maybe_open_context()`) is cancelled mid-cache-miss, the embedded
|
||||||
|
mutex can never be left in a corrupted state.
|
||||||
|
|
||||||
|
That is, the lock is always eventually released ensuring a peer
|
||||||
|
(cache-hitting) task will never,
|
||||||
|
|
||||||
|
- be left to inf-block/hang on the `lock.acquire()`.
|
||||||
|
- try to release the lock when still owned by the caching-task
|
||||||
|
due to it having erronously exited without calling
|
||||||
|
`lock.release()`.
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
delay: float = 1.
|
||||||
|
|
||||||
|
async def use_moc(
|
||||||
|
cs: trio.CancelScope|None,
|
||||||
|
delay: float,
|
||||||
|
):
|
||||||
|
log.info('task entering moc')
|
||||||
|
async with maybe_open_context(
|
||||||
|
cancel_outer_cs,
|
||||||
|
kwargs={
|
||||||
|
'cs': cs,
|
||||||
|
'delay': delay,
|
||||||
|
},
|
||||||
|
) as (cache_hit, _null):
|
||||||
|
if cache_hit:
|
||||||
|
log.info('2nd task entered')
|
||||||
|
else:
|
||||||
|
log.info('1st task entered')
|
||||||
|
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(delay + 2):
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
get_console_log('info')
|
||||||
|
log.info('yo starting')
|
||||||
|
cs = tn.cancel_scope
|
||||||
|
tn.start_soon(
|
||||||
|
use_moc,
|
||||||
|
cs,
|
||||||
|
delay,
|
||||||
|
name='child',
|
||||||
|
)
|
||||||
|
with trio.CancelScope() as rent_cs:
|
||||||
|
await use_moc(
|
||||||
|
cs=rent_cs,
|
||||||
|
delay=delay,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
Loading…
Reference in New Issue