diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index e7958b9c..d54d76a9 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -120,13 +120,13 @@ async def open_stream() -> Awaitable[ print('Entered open_stream() caller') yield an, stream print('Exited open_stream() caller') - # await tractor.pause(shield=True) finally: - print('Cancelling streamer') - # await tractor.pause(shield=True) - with trio.CancelScope(shield=True): - await portal.cancel_actor() + print( + 'Cancelling streamer with,\n' + '=> `Portal.cancel_actor()`' + ) + await portal.cancel_actor() print('Cancelled streamer') except Exception as err: @@ -147,6 +147,8 @@ async def maybe_open_stream(taskname: str): cache_hit, (an, stream) ): + # when the actor + portal + ctx + stream has already been + # allocated we want to just bcast to this task. if cache_hit: print(f'{taskname} loaded from cache') @@ -160,6 +162,11 @@ async def maybe_open_stream(taskname: str): f')>\n' f' |_{taskname}\n' ) + + # we should always unreg the "cloned" bcrc for this + # consumer-task + assert id(bstream) not in bstream._state.subs + else: # yield the actual stream try: @@ -171,15 +178,21 @@ async def maybe_open_stream(taskname: str): f' |_{taskname}\n' ) - bstream = stream._broadcaster - if ( - not bstream._state.subs - and - not cache_hit - ): - await tractor.pause(shield=True) - # await an.cancel() - + first_bstream = stream._broadcaster + bcrx_state = first_bstream._state + subs: dict[int, int] = bcrx_state.subs + if len(subs) == 1: + assert id(first_bstream) in subs + # ^^TODO! the bcrx should always de-allocate all subs, + # including the implicit first one allocated on entry + # by the first subscribing peer task, no? + # + # -[ ] adjust `MsgStream.subscribe()` to do this mgmt! + # |_ allows reverting `MsgStream.receive()` to the + # non-bcaster method. + # |_ we can decide whether to reset `._broadcaster`? + # + # await tractor.pause(shield=True) def test_open_local_sub_to_stream( @@ -261,13 +274,25 @@ def test_open_local_sub_to_stream( ) await trio.sleep(0.001) - print('all consumer tasks finished.') + print('all consumer tasks finished!') - # ensure actor-nursery is shutdown or we might - # hang here..? - # if root.ipc_server.has_peers(): - # await tractor.pause() - # await an.cancel() + # ?XXX, ensure actor-nursery is shutdown or we might + # hang here due to a minor task deadlock/race-condition? + # + # - seems that all we need is a checkpoint to ensure + # the last suspended task, which is inside + # `.maybe_open_context()`, can do the + # `Portal.cancel_actor()` call? + # + # - if that bg task isn't resumed, then this blocks + # timeout might hit before that? + # + if root.ipc_server.has_peers(): + await trio.lowlevel.checkpoint() + + # alt approach, cancel the entire `an` + # await tractor.pause() + # await an.cancel() # end of runtime scope print('root actor terminated.')