From 7a075494f18c9bbae937dee159c52e01c7771d7b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Jul 2025 21:59:42 -0400 Subject: [PATCH] Well then, I guess it just needed, a checkpoint XD Here I was thinking the bcaster (usage) maybe required a rework but, NOPE it's just bc a checkpoint was needed in the parent task owning the `tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated `an`/portal is eventually cancel-called.. Ah well, at least i started a patch for `MsgStream.subscribe()` to make it multicast revertible.. XD Anyway, I tossed in some checks & notes related to all that unnecessary effort since I do think i'll move forward implementing it: - for the `cache_hit` case always verify that the `bcast` clone is unregistered from the common state subs after `.subscribe().__aexit__()`. - do a light check that the implicit `MsgStream._broadcaster` is always the only bcrx instance left-leaked into that state.. that is until i get the proper de-allocation/reversion from multicast -> unicast working. - put in mega detailed note about the required parent-task checkpoint. --- tests/test_resource_cache.py | 65 +++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index e7958b9c..d54d76a9 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -120,13 +120,13 @@ async def open_stream() -> Awaitable[ print('Entered open_stream() caller') yield an, stream print('Exited open_stream() caller') - # await tractor.pause(shield=True) finally: - print('Cancelling streamer') - # await tractor.pause(shield=True) - with trio.CancelScope(shield=True): - await portal.cancel_actor() + print( + 'Cancelling streamer with,\n' + '=> `Portal.cancel_actor()`' + ) + await portal.cancel_actor() print('Cancelled streamer') except Exception as err: @@ -147,6 +147,8 @@ async def maybe_open_stream(taskname: str): cache_hit, (an, stream) ): + # when the actor + portal + ctx + stream has already been + # allocated we want to just bcast to this task. if cache_hit: print(f'{taskname} loaded from cache') @@ -160,6 +162,11 @@ async def maybe_open_stream(taskname: str): f')>\n' f' |_{taskname}\n' ) + + # we should always unreg the "cloned" bcrc for this + # consumer-task + assert id(bstream) not in bstream._state.subs + else: # yield the actual stream try: @@ -171,15 +178,21 @@ async def maybe_open_stream(taskname: str): f' |_{taskname}\n' ) - bstream = stream._broadcaster - if ( - not bstream._state.subs - and - not cache_hit - ): - await tractor.pause(shield=True) - # await an.cancel() - + first_bstream = stream._broadcaster + bcrx_state = first_bstream._state + subs: dict[int, int] = bcrx_state.subs + if len(subs) == 1: + assert id(first_bstream) in subs + # ^^TODO! the bcrx should always de-allocate all subs, + # including the implicit first one allocated on entry + # by the first subscribing peer task, no? + # + # -[ ] adjust `MsgStream.subscribe()` to do this mgmt! + # |_ allows reverting `MsgStream.receive()` to the + # non-bcaster method. + # |_ we can decide whether to reset `._broadcaster`? + # + # await tractor.pause(shield=True) def test_open_local_sub_to_stream( @@ -261,13 +274,25 @@ def test_open_local_sub_to_stream( ) await trio.sleep(0.001) - print('all consumer tasks finished.') + print('all consumer tasks finished!') - # ensure actor-nursery is shutdown or we might - # hang here..? - # if root.ipc_server.has_peers(): - # await tractor.pause() - # await an.cancel() + # ?XXX, ensure actor-nursery is shutdown or we might + # hang here due to a minor task deadlock/race-condition? + # + # - seems that all we need is a checkpoint to ensure + # the last suspended task, which is inside + # `.maybe_open_context()`, can do the + # `Portal.cancel_actor()` call? + # + # - if that bg task isn't resumed, then this blocks + # timeout might hit before that? + # + if root.ipc_server.has_peers(): + await trio.lowlevel.checkpoint() + + # alt approach, cancel the entire `an` + # await tractor.pause() + # await an.cancel() # end of runtime scope print('root actor terminated.')