Parametrize `test_resource_cache.test_open_local_sub_to_stream`

Namely with multiple pre-sleep `delay`-parametrizations before either,

- parent-scope cancel-calling (as originally) or,
- depending on the  new `cancel_by_cs: bool` suite parameter, optionally
  just immediately exiting from (the newly named)
  `maybe_cancel_outer_cs()` a checkpoint.

In the latter case we ensure we **don't** inf sleep to avoid leaking
those tasks into the `Actor._service_tn` (though we should really have
a better soln for this)..

Deats,
- make `cs` args optional and adjust internal logic to match.
- add some notes around various edge cases and issues with using the
  actor-service-tn as the scope by default.
maybe_open_ctx_locking
Gud Boi 2026-04-06 13:38:35 -04:00
parent bbf01d5161
commit ebe9d5e4b5
1 changed files with 34 additions and 12 deletions

View File

@ -318,7 +318,7 @@ def test_open_local_sub_to_stream(
@acm @acm
async def cancel_outer_cs( async def maybe_cancel_outer_cs(
cs: trio.CancelScope|None = None, cs: trio.CancelScope|None = None,
delay: float = 0, delay: float = 0,
): ):
@ -332,12 +332,31 @@ async def cancel_outer_cs(
if cs: if cs:
log.info('task calling cs.cancel()') log.info('task calling cs.cancel()')
cs.cancel() cs.cancel()
trio.lowlevel.checkpoint()
yield yield
if cs:
await trio.sleep_forever() await trio.sleep_forever()
# XXX, if not cancelled we'll leak this inf-blocking
# subtask to the actor's service tn..
else:
await trio.lowlevel.checkpoint()
@pytest.mark.parametrize(
'delay',
[0.05, 0.5, 1],
ids="pre_sleep_delay={}".format,
)
@pytest.mark.parametrize(
'cancel_by_cs',
[True, False],
ids="cancel_by_cs={}".format,
)
def test_lock_not_corrupted_on_fast_cancel( def test_lock_not_corrupted_on_fast_cancel(
delay: float,
cancel_by_cs: bool,
debug_mode: bool, debug_mode: bool,
loglevel: str, loglevel: str,
): ):
@ -354,17 +373,14 @@ def test_lock_not_corrupted_on_fast_cancel(
due to it having erronously exited without calling due to it having erronously exited without calling
`lock.release()`. `lock.release()`.
''' '''
delay: float = 1.
async def use_moc( async def use_moc(
cs: trio.CancelScope|None,
delay: float, delay: float,
cs: trio.CancelScope|None = None,
): ):
log.info('task entering moc') log.info('task entering moc')
async with maybe_open_context( async with maybe_open_context(
cancel_outer_cs, maybe_cancel_outer_cs,
kwargs={ kwargs={
'cs': cs, 'cs': cs,
'delay': delay, 'delay': delay,
@ -375,8 +391,14 @@ def test_lock_not_corrupted_on_fast_cancel(
else: else:
log.info('1st task entered') log.info('1st task entered')
if cs:
await trio.sleep_forever() await trio.sleep_forever()
else:
await trio.sleep(delay)
# ^END, exit shared ctx.
async def main(): async def main():
with trio.fail_after(delay + 2): with trio.fail_after(delay + 2):
async with ( async with (
@ -384,6 +406,7 @@ def test_lock_not_corrupted_on_fast_cancel(
debug_mode=debug_mode, debug_mode=debug_mode,
loglevel=loglevel, loglevel=loglevel,
), ),
# ?TODO, pass this as the parent tn?
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
get_console_log('info') get_console_log('info')
@ -391,15 +414,14 @@ def test_lock_not_corrupted_on_fast_cancel(
cs = tn.cancel_scope cs = tn.cancel_scope
tn.start_soon( tn.start_soon(
use_moc, use_moc,
cs,
delay, delay,
cs if cancel_by_cs else None,
name='child', name='child',
) )
with trio.CancelScope() as rent_cs: with trio.CancelScope() as rent_cs:
await use_moc( await use_moc(
cs=rent_cs,
delay=delay, delay=delay,
cs=rent_cs if cancel_by_cs else None,
) )
trio.run(main) trio.run(main)