From dd7aca539f3ce0374001c85208a1e26a7ad8c63f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 16:48:46 -0400 Subject: [PATCH] Tool-up `test_resource_cache.test_open_local_sub_to_stream` Since I recently discovered a very subtle race-case that can sometimes cause the suite to hang, seemingly due to the `an: ActorNursery` allocated *behind* the `.trionics.maybe_open_context()` usage; this can result in never cancelling the 'streamer' subactor despite the `main()` timeout-guard? This led me to dig in and find that the underlying issue was 2-fold, - our `BroadcastReceiver` termination-mgmt semantics in `MsgStream.subscribe()` can result in the first subscribing task to always keep the `MsgStream._broadcaster` instance allocated; it's never `.aclose()`ed, which makes it tough to determine (and thus trace) when all subscriber-tasks are actually complete and exited-from-`.subscribe()`.. - i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in `._runtime.async_main()`'s shutdown sequence which would then compound the issue resulting in a SIGINT-shielded hang.. the worst kind XD Actual changes here are just styling, printing, and some mucking with passing the `an`-ref up to the parent task in the root-actor where i was doing a conditional `ActorNursery.cancel()` to mk sure that was actually the problem. Presuming this is fixed the `.pause()` i left unmasked should never hit. --- tests/test_resource_cache.py | 113 ++++++++++++++++++++++++++++------- 1 file changed, 91 insertions(+), 22 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index bdcdd6c9..74b00364 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on): with trio.move_on_after(0.5): async with ( tractor.open_root_actor(), - trio.open_nursery() as n, + trio.open_nursery() as tn, ): - for i in range(10): - n.start_soon(enter_cached_mngr, f'task_{i}') + tn.start_soon( + enter_cached_mngr, + f'task_{i}', + ) await trio.sleep(0.001) trio.run(main) @@ -98,23 +100,34 @@ async def streamer( @acm -async def open_stream() -> Awaitable[tractor.MsgStream]: - +async def open_stream() -> Awaitable[ + tuple[ + tractor.ActorNursery, + tractor.MsgStream, + ] +]: try: async with tractor.open_nursery() as an: portal = await an.start_actor( 'streamer', enable_modules=[__name__], ) - async with ( - portal.open_context(streamer) as (ctx, first), - ctx.open_stream() as stream, - ): - yield stream + try: + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + print('Entered open_stream() caller') + yield an, stream + print('Exited open_stream() caller') + # await tractor.pause(shield=True) - print('Cancelling streamer') - await portal.cancel_actor() - print('Cancelled streamer') + finally: + print('Cancelling streamer') + # await tractor.pause(shield=True) + with trio.CancelScope(shield=True): + await portal.cancel_actor() + print('Cancelled streamer') except Exception as err: print( @@ -130,8 +143,10 @@ async def maybe_open_stream(taskname: str): async with tractor.trionics.maybe_open_context( # NOTE: all secondary tasks should cache hit on the same key acm_func=open_stream, - ) as (cache_hit, stream): - + ) as ( + cache_hit, + (an, stream) + ): if cache_hit: print(f'{taskname} loaded from cache') @@ -139,10 +154,32 @@ async def maybe_open_stream(taskname: str): # if this feed is already allocated by the first # task that entereed async with stream.subscribe() as bstream: - yield bstream + yield an, bstream + print( + f'cached task exited\n' + f')>\n' + f' |_{taskname}\n' + ) else: # yield the actual stream - yield stream + try: + yield an, stream + finally: + print( + f'NON-cached task exited\n' + f')>\n' + f' |_{taskname}\n' + ) + + bstream = stream._broadcaster + if ( + not bstream.state.subs + and + not cache_hit + ): + await tractor.pause(shield=True) + # await an.cancel() + def test_open_local_sub_to_stream( @@ -159,16 +196,24 @@ def test_open_local_sub_to_stream( if debug_mode: timeout = 999 + print(f'IN debug_mode, setting large timeout={timeout!r}..') async def main(): full = list(range(1000)) + an: tractor.ActorNursery|None = None + num_tasks: int = 10 async def get_sub_and_pull(taskname: str): + nonlocal an + stream: tractor.MsgStream async with ( - maybe_open_stream(taskname) as stream, + maybe_open_stream(taskname) as ( + an, + stream, + ), ): if '0' in taskname: assert isinstance(stream, tractor.MsgStream) @@ -180,34 +225,58 @@ def test_open_local_sub_to_stream( first = await stream.receive() print(f'{taskname} started with value {first}') - seq = [] + seq: list[int] = [] async for msg in stream: seq.append(msg) assert set(seq).issubset(set(full)) + + # end of @acm block print(f'{taskname} finished') + root: tractor.Actor with trio.fail_after(timeout) as cs: # TODO: turns out this isn't multi-task entrant XD # We probably need an indepotent entry semantic? async with tractor.open_root_actor( debug_mode=debug_mode, - ): + # maybe_enable_greenback=True, + # + # ^TODO? doesn't seem to mk breakpoint() usage work + # bc each bg task needs to open a portal?? + # - [ ] we should consider making this part of + # our taskman defaults? + # |_see https://github.com/goodboy/tractor/pull/363 + # + ) as root: + assert root.is_registrar + async with ( trio.open_nursery() as tn, ): - for i in range(10): + for i in range(num_tasks): tn.start_soon( get_sub_and_pull, f'task_{i}', ) await trio.sleep(0.001) - print('all consumer tasks finished') + print('all consumer tasks finished.') + + # ensure actor-nursery is shutdown or we might + # hang here..? + # if root.ipc_server.has_peers(): + # await tractor.pause() + # await an.cancel() + + # end of runtime scope + print('root actor terminated.') if cs.cancelled_caught: pytest.fail( 'Should NOT time out in `open_root_actor()` ?' ) + print('exiting main.') + trio.run(main)