Well then, I guess it just needed, a checkpoint XD
Here I was thinking the bcaster (usage) maybe required a rework but, NOPE it's just bc a checkpoint was needed in the parent task owning the `tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated `an`/portal is eventually cancel-called.. Ah well, at least i started a patch for `MsgStream.subscribe()` to make it multicast revertible.. XD Anyway, I tossed in some checks & notes related to all that unnecessary effort since I do think i'll move forward implementing it: - for the `cache_hit` case always verify that the `bcast` clone is unregistered from the common state subs after `.subscribe().__aexit__()`. - do a light check that the implicit `MsgStream._broadcaster` is always the only bcrx instance left-leaked into that state.. that is until i get the proper de-allocation/reversion from multicast -> unicast working. - put in mega detailed note about the required parent-task checkpoint.to_asyncio_eoc_signal
							parent
							
								
									51944a0b99
								
							
						
					
					
						commit
						d2c3e32bf1
					
				|  | @ -120,13 +120,13 @@ async def open_stream() -> Awaitable[ | |||
|                     print('Entered open_stream() caller') | ||||
|                     yield an, stream | ||||
|                     print('Exited open_stream() caller') | ||||
|                     # await tractor.pause(shield=True) | ||||
| 
 | ||||
|             finally: | ||||
|                 print('Cancelling streamer') | ||||
|                 # await tractor.pause(shield=True) | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await portal.cancel_actor() | ||||
|                 print( | ||||
|                     'Cancelling streamer with,\n' | ||||
|                     '=> `Portal.cancel_actor()`' | ||||
|                 ) | ||||
|                 await portal.cancel_actor() | ||||
|                 print('Cancelled streamer') | ||||
| 
 | ||||
|     except Exception as err: | ||||
|  | @ -147,6 +147,8 @@ async def maybe_open_stream(taskname: str): | |||
|         cache_hit, | ||||
|         (an, stream) | ||||
|     ): | ||||
|         # when the actor + portal + ctx + stream has already been | ||||
|         # allocated we want to just bcast to this task. | ||||
|         if cache_hit: | ||||
|             print(f'{taskname} loaded from cache') | ||||
| 
 | ||||
|  | @ -160,6 +162,11 @@ async def maybe_open_stream(taskname: str): | |||
|                     f')>\n' | ||||
|                     f' |_{taskname}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # we should always unreg the "cloned" bcrc for this | ||||
|             # consumer-task | ||||
|             assert id(bstream) not in bstream._state.subs | ||||
| 
 | ||||
|         else: | ||||
|             # yield the actual stream | ||||
|             try: | ||||
|  | @ -171,15 +178,21 @@ async def maybe_open_stream(taskname: str): | |||
|                     f' |_{taskname}\n' | ||||
|                 ) | ||||
| 
 | ||||
|         bstream = stream._broadcaster | ||||
|         if ( | ||||
|             not bstream._state.subs | ||||
|             and | ||||
|             not cache_hit | ||||
|         ): | ||||
|             await tractor.pause(shield=True) | ||||
|             # await an.cancel() | ||||
| 
 | ||||
|         first_bstream = stream._broadcaster | ||||
|         bcrx_state = first_bstream._state | ||||
|         subs: dict[int, int] = bcrx_state.subs | ||||
|         if len(subs) == 1: | ||||
|             assert id(first_bstream) in subs | ||||
|             # ^^TODO! the bcrx should always de-allocate all subs, | ||||
|             # including the implicit first one allocated on entry | ||||
|             # by the first subscribing peer task, no? | ||||
|             # | ||||
|             # -[ ] adjust `MsgStream.subscribe()` to do this mgmt! | ||||
|             #  |_ allows reverting `MsgStream.receive()` to the | ||||
|             #    non-bcaster method. | ||||
|             #  |_ we can decide whether to reset `._broadcaster`? | ||||
|             # | ||||
|             # await tractor.pause(shield=True) | ||||
| 
 | ||||
| 
 | ||||
| def test_open_local_sub_to_stream( | ||||
|  | @ -261,13 +274,25 @@ def test_open_local_sub_to_stream( | |||
|                         ) | ||||
|                         await trio.sleep(0.001) | ||||
| 
 | ||||
|                 print('all consumer tasks finished.') | ||||
|                 print('all consumer tasks finished!') | ||||
| 
 | ||||
|                 # ensure actor-nursery is shutdown or we might | ||||
|                 # hang here..? | ||||
|                 # if root.ipc_server.has_peers(): | ||||
|                 #     await tractor.pause() | ||||
|                 #     await an.cancel() | ||||
|                 # ?XXX, ensure actor-nursery is shutdown or we might | ||||
|                 # hang here due to a minor task deadlock/race-condition? | ||||
|                 # | ||||
|                 # - seems that all we need is a checkpoint to ensure | ||||
|                 #   the last suspended task, which is inside | ||||
|                 #   `.maybe_open_context()`, can do the | ||||
|                 #   `Portal.cancel_actor()` call? | ||||
|                 # | ||||
|                 # - if that bg task isn't resumed, then this blocks | ||||
|                 #   timeout might hit before that? | ||||
|                 # | ||||
|                 if root.ipc_server.has_peers(): | ||||
|                     await trio.lowlevel.checkpoint() | ||||
| 
 | ||||
|                     # alt approach, cancel the entire `an` | ||||
|                     # await tractor.pause() | ||||
|                     # await an.cancel() | ||||
| 
 | ||||
|             # end of runtime scope | ||||
|             print('root actor terminated.') | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue