Well then, I guess it just needed, a checkpoint XD
Here I was thinking the bcaster (usage) maybe required a rework but, NOPE it's just bc a checkpoint was needed in the parent task owning the `tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated `an`/portal is eventually cancel-called.. Ah well, at least i started a patch for `MsgStream.subscribe()` to make it multicast revertible.. XD Anyway, I tossed in some checks & notes related to all that unnecessary effort since I do think i'll move forward implementing it: - for the `cache_hit` case always verify that the `bcast` clone is unregistered from the common state subs after `.subscribe().__aexit__()`. - do a light check that the implicit `MsgStream._broadcaster` is always the only bcrx instance left-leaked into that state.. that is until i get the proper de-allocation/reversion from multicast -> unicast working. - put in mega detailed note about the required parent-task checkpoint.
							parent
							
								
									c3aa29e7fa
								
							
						
					
					
						commit
						7a075494f1
					
				| 
						 | 
				
			
			@ -120,13 +120,13 @@ async def open_stream() -> Awaitable[
 | 
			
		|||
                    print('Entered open_stream() caller')
 | 
			
		||||
                    yield an, stream
 | 
			
		||||
                    print('Exited open_stream() caller')
 | 
			
		||||
                    # await tractor.pause(shield=True)
 | 
			
		||||
 | 
			
		||||
            finally:
 | 
			
		||||
                print('Cancelling streamer')
 | 
			
		||||
                # await tractor.pause(shield=True)
 | 
			
		||||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await portal.cancel_actor()
 | 
			
		||||
                print(
 | 
			
		||||
                    'Cancelling streamer with,\n'
 | 
			
		||||
                    '=> `Portal.cancel_actor()`'
 | 
			
		||||
                )
 | 
			
		||||
                await portal.cancel_actor()
 | 
			
		||||
                print('Cancelled streamer')
 | 
			
		||||
 | 
			
		||||
    except Exception as err:
 | 
			
		||||
| 
						 | 
				
			
			@ -147,6 +147,8 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
        cache_hit,
 | 
			
		||||
        (an, stream)
 | 
			
		||||
    ):
 | 
			
		||||
        # when the actor + portal + ctx + stream has already been
 | 
			
		||||
        # allocated we want to just bcast to this task.
 | 
			
		||||
        if cache_hit:
 | 
			
		||||
            print(f'{taskname} loaded from cache')
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -160,6 +162,11 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
                    f')>\n'
 | 
			
		||||
                    f' |_{taskname}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # we should always unreg the "cloned" bcrc for this
 | 
			
		||||
            # consumer-task
 | 
			
		||||
            assert id(bstream) not in bstream._state.subs
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            # yield the actual stream
 | 
			
		||||
            try:
 | 
			
		||||
| 
						 | 
				
			
			@ -171,15 +178,21 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
                    f' |_{taskname}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        bstream = stream._broadcaster
 | 
			
		||||
        if (
 | 
			
		||||
            not bstream._state.subs
 | 
			
		||||
            and
 | 
			
		||||
            not cache_hit
 | 
			
		||||
        ):
 | 
			
		||||
            await tractor.pause(shield=True)
 | 
			
		||||
            # await an.cancel()
 | 
			
		||||
 | 
			
		||||
        first_bstream = stream._broadcaster
 | 
			
		||||
        bcrx_state = first_bstream._state
 | 
			
		||||
        subs: dict[int, int] = bcrx_state.subs
 | 
			
		||||
        if len(subs) == 1:
 | 
			
		||||
            assert id(first_bstream) in subs
 | 
			
		||||
            # ^^TODO! the bcrx should always de-allocate all subs,
 | 
			
		||||
            # including the implicit first one allocated on entry
 | 
			
		||||
            # by the first subscribing peer task, no?
 | 
			
		||||
            #
 | 
			
		||||
            # -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
 | 
			
		||||
            #  |_ allows reverting `MsgStream.receive()` to the
 | 
			
		||||
            #    non-bcaster method.
 | 
			
		||||
            #  |_ we can decide whether to reset `._broadcaster`?
 | 
			
		||||
            #
 | 
			
		||||
            # await tractor.pause(shield=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_open_local_sub_to_stream(
 | 
			
		||||
| 
						 | 
				
			
			@ -261,13 +274,25 @@ def test_open_local_sub_to_stream(
 | 
			
		|||
                        )
 | 
			
		||||
                        await trio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
                print('all consumer tasks finished.')
 | 
			
		||||
                print('all consumer tasks finished!')
 | 
			
		||||
 | 
			
		||||
                # ensure actor-nursery is shutdown or we might
 | 
			
		||||
                # hang here..?
 | 
			
		||||
                # if root.ipc_server.has_peers():
 | 
			
		||||
                #     await tractor.pause()
 | 
			
		||||
                #     await an.cancel()
 | 
			
		||||
                # ?XXX, ensure actor-nursery is shutdown or we might
 | 
			
		||||
                # hang here due to a minor task deadlock/race-condition?
 | 
			
		||||
                #
 | 
			
		||||
                # - seems that all we need is a checkpoint to ensure
 | 
			
		||||
                #   the last suspended task, which is inside
 | 
			
		||||
                #   `.maybe_open_context()`, can do the
 | 
			
		||||
                #   `Portal.cancel_actor()` call?
 | 
			
		||||
                #
 | 
			
		||||
                # - if that bg task isn't resumed, then this blocks
 | 
			
		||||
                #   timeout might hit before that?
 | 
			
		||||
                #
 | 
			
		||||
                if root.ipc_server.has_peers():
 | 
			
		||||
                    await trio.lowlevel.checkpoint()
 | 
			
		||||
 | 
			
		||||
                    # alt approach, cancel the entire `an`
 | 
			
		||||
                    # await tractor.pause()
 | 
			
		||||
                    # await an.cancel()
 | 
			
		||||
 | 
			
		||||
            # end of runtime scope
 | 
			
		||||
            print('root actor terminated.')
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue