Tool-up `test_resource_cache.test_open_local_sub_to_stream`
Since I recently discovered a very subtle race-case that can sometimes cause the suite to hang, seemingly due to the `an: ActorNursery` allocated *behind* the `.trionics.maybe_open_context()` usage; this can result in never cancelling the 'streamer' subactor despite the `main()` timeout-guard? This led me to dig in and find that the underlying issue was 2-fold, - our `BroadcastReceiver` termination-mgmt semantics in `MsgStream.subscribe()` can result in the first subscribing task to always keep the `MsgStream._broadcaster` instance allocated; it's never `.aclose()`ed, which makes it tough to determine (and thus trace) when all subscriber-tasks are actually complete and exited-from-`.subscribe()`.. - i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in `._runtime.async_main()`'s shutdown sequence which would then compound the issue resulting in a SIGINT-shielded hang.. the worst kind XD Actual changes here are just styling, printing, and some mucking with passing the `an`-ref up to the parent task in the root-actor where i was doing a conditional `ActorNursery.cancel()` to mk sure that was actually the problem. Presuming this is fixed the `.pause()` i left unmasked should never hit.to_asyncio_eoc_signal
							parent
							
								
									735dc9056a
								
							
						
					
					
						commit
						dd7aca539f
					
				|  | @ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on): | |||
|         with trio.move_on_after(0.5): | ||||
|             async with ( | ||||
|                 tractor.open_root_actor(), | ||||
|                 trio.open_nursery() as n, | ||||
|                 trio.open_nursery() as tn, | ||||
|             ): | ||||
| 
 | ||||
|                 for i in range(10): | ||||
|                     n.start_soon(enter_cached_mngr, f'task_{i}') | ||||
|                     tn.start_soon( | ||||
|                         enter_cached_mngr, | ||||
|                         f'task_{i}', | ||||
|                     ) | ||||
|                     await trio.sleep(0.001) | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  | @ -98,23 +100,34 @@ async def streamer( | |||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_stream() -> Awaitable[tractor.MsgStream]: | ||||
| 
 | ||||
| async def open_stream() -> Awaitable[ | ||||
|     tuple[ | ||||
|         tractor.ActorNursery, | ||||
|         tractor.MsgStream, | ||||
|     ] | ||||
| ]: | ||||
|     try: | ||||
|         async with tractor.open_nursery() as an: | ||||
|             portal = await an.start_actor( | ||||
|                 'streamer', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
|             async with ( | ||||
|                 portal.open_context(streamer) as (ctx, first), | ||||
|                 ctx.open_stream() as stream, | ||||
|             ): | ||||
|                 yield stream | ||||
|             try: | ||||
|                 async with ( | ||||
|                     portal.open_context(streamer) as (ctx, first), | ||||
|                     ctx.open_stream() as stream, | ||||
|                 ): | ||||
|                     print('Entered open_stream() caller') | ||||
|                     yield an, stream | ||||
|                     print('Exited open_stream() caller') | ||||
|                     # await tractor.pause(shield=True) | ||||
| 
 | ||||
|             print('Cancelling streamer') | ||||
|             await portal.cancel_actor() | ||||
|             print('Cancelled streamer') | ||||
|             finally: | ||||
|                 print('Cancelling streamer') | ||||
|                 # await tractor.pause(shield=True) | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await portal.cancel_actor() | ||||
|                 print('Cancelled streamer') | ||||
| 
 | ||||
|     except Exception as err: | ||||
|         print( | ||||
|  | @ -130,8 +143,10 @@ async def maybe_open_stream(taskname: str): | |||
|     async with tractor.trionics.maybe_open_context( | ||||
|         # NOTE: all secondary tasks should cache hit on the same key | ||||
|         acm_func=open_stream, | ||||
|     ) as (cache_hit, stream): | ||||
| 
 | ||||
|     ) as ( | ||||
|         cache_hit, | ||||
|         (an, stream) | ||||
|     ): | ||||
|         if cache_hit: | ||||
|             print(f'{taskname} loaded from cache') | ||||
| 
 | ||||
|  | @ -139,10 +154,32 @@ async def maybe_open_stream(taskname: str): | |||
|             # if this feed is already allocated by the first | ||||
|             # task that entereed | ||||
|             async with stream.subscribe() as bstream: | ||||
|                 yield bstream | ||||
|                 yield an, bstream | ||||
|                 print( | ||||
|                     f'cached task exited\n' | ||||
|                     f')>\n' | ||||
|                     f' |_{taskname}\n' | ||||
|                 ) | ||||
|         else: | ||||
|             # yield the actual stream | ||||
|             yield stream | ||||
|             try: | ||||
|                 yield an, stream | ||||
|             finally: | ||||
|                 print( | ||||
|                     f'NON-cached task exited\n' | ||||
|                     f')>\n' | ||||
|                     f' |_{taskname}\n' | ||||
|                 ) | ||||
| 
 | ||||
|         bstream = stream._broadcaster | ||||
|         if ( | ||||
|             not bstream.state.subs | ||||
|             and | ||||
|             not cache_hit | ||||
|         ): | ||||
|             await tractor.pause(shield=True) | ||||
|             # await an.cancel() | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def test_open_local_sub_to_stream( | ||||
|  | @ -159,16 +196,24 @@ def test_open_local_sub_to_stream( | |||
| 
 | ||||
|     if debug_mode: | ||||
|         timeout = 999 | ||||
|         print(f'IN debug_mode, setting large timeout={timeout!r}..') | ||||
| 
 | ||||
|     async def main(): | ||||
| 
 | ||||
|         full = list(range(1000)) | ||||
|         an: tractor.ActorNursery|None = None | ||||
|         num_tasks: int = 10 | ||||
| 
 | ||||
|         async def get_sub_and_pull(taskname: str): | ||||
| 
 | ||||
|             nonlocal an | ||||
| 
 | ||||
|             stream: tractor.MsgStream | ||||
|             async with ( | ||||
|                 maybe_open_stream(taskname) as stream, | ||||
|                 maybe_open_stream(taskname) as ( | ||||
|                     an, | ||||
|                     stream, | ||||
|                 ), | ||||
|             ): | ||||
|                 if '0' in taskname: | ||||
|                     assert isinstance(stream, tractor.MsgStream) | ||||
|  | @ -180,34 +225,58 @@ def test_open_local_sub_to_stream( | |||
| 
 | ||||
|                 first = await stream.receive() | ||||
|                 print(f'{taskname} started with value {first}') | ||||
|                 seq = [] | ||||
|                 seq: list[int] = [] | ||||
|                 async for msg in stream: | ||||
|                     seq.append(msg) | ||||
| 
 | ||||
|                 assert set(seq).issubset(set(full)) | ||||
| 
 | ||||
|             # end of @acm block | ||||
|             print(f'{taskname} finished') | ||||
| 
 | ||||
|         root: tractor.Actor | ||||
|         with trio.fail_after(timeout) as cs: | ||||
|             # TODO: turns out this isn't multi-task entrant XD | ||||
|             # We probably need an indepotent entry semantic? | ||||
|             async with tractor.open_root_actor( | ||||
|                 debug_mode=debug_mode, | ||||
|             ): | ||||
|                 # maybe_enable_greenback=True, | ||||
|                 # | ||||
|                 # ^TODO? doesn't seem to mk breakpoint() usage work | ||||
|                 # bc each bg task needs to open a portal?? | ||||
|                 # - [ ] we should consider making this part of | ||||
|                 #      our taskman defaults? | ||||
|                 #   |_see https://github.com/goodboy/tractor/pull/363 | ||||
|                 # | ||||
|             ) as root: | ||||
|                 assert root.is_registrar | ||||
| 
 | ||||
|                 async with ( | ||||
|                     trio.open_nursery() as tn, | ||||
|                 ): | ||||
|                     for i in range(10): | ||||
|                     for i in range(num_tasks): | ||||
|                         tn.start_soon( | ||||
|                             get_sub_and_pull, | ||||
|                             f'task_{i}', | ||||
|                         ) | ||||
|                         await trio.sleep(0.001) | ||||
| 
 | ||||
|                 print('all consumer tasks finished') | ||||
|                 print('all consumer tasks finished.') | ||||
| 
 | ||||
|                 # ensure actor-nursery is shutdown or we might | ||||
|                 # hang here..? | ||||
|                 # if root.ipc_server.has_peers(): | ||||
|                 #     await tractor.pause() | ||||
|                 #     await an.cancel() | ||||
| 
 | ||||
|             # end of runtime scope | ||||
|             print('root actor terminated.') | ||||
| 
 | ||||
|         if cs.cancelled_caught: | ||||
|             pytest.fail( | ||||
|                 'Should NOT time out in `open_root_actor()` ?' | ||||
|             ) | ||||
| 
 | ||||
|         print('exiting main.') | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue