Parametrize rw test with variable frame sizes
Demonstrates fixed size frame-oriented reads by the child where the parent only transmits a "read" stream msg on "frame fill events" such that the child incrementally reads the shm list data (much like in a real-time-buffered streaming system).
							parent
							
								
									87a9bd842a
								
							
						
					
					
						commit
						a6813e00d0
					
				|  | @ -24,7 +24,10 @@ async def child_attach_shml_alot( | |||
| 
 | ||||
|     # now try to attach a boatload of times in a loop.. | ||||
|     for _ in range(1000): | ||||
|         shml = attach_shm_list(key=shm_key) | ||||
|         shml = attach_shm_list( | ||||
|             key=shm_key, | ||||
|             readonly=False, | ||||
|         ) | ||||
|         assert shml.shm.name == shm_key | ||||
|         await trio.sleep(0.001) | ||||
| 
 | ||||
|  | @ -46,8 +49,8 @@ def test_child_attaches_alot(): | |||
| 
 | ||||
|             async with ( | ||||
|                 portal.open_context( | ||||
|                     child_attach_shml_alot,  # taken from pytest parameterization | ||||
|                     shm_key=key, | ||||
|                     child_attach_shml_alot, | ||||
|                     shm_key=shml.key, | ||||
|                 ) as (ctx, start_val), | ||||
|             ): | ||||
|                 assert start_val == key | ||||
|  | @ -63,50 +66,70 @@ async def child_read_shm_list( | |||
|     ctx: tractor.Context, | ||||
|     shm_key: str, | ||||
|     use_str: bool, | ||||
|     frame_size: int, | ||||
| ) -> None: | ||||
| 
 | ||||
|     # attach in child | ||||
|     shml = attach_shm_list(key=shm_key) | ||||
|     await ctx.started(shml.key) | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
|         async for i in stream: | ||||
|             print(f'reading shm list index: {i}') | ||||
|             print(f'(child): reading shm list index: {i}') | ||||
| 
 | ||||
|             if use_str: | ||||
|                 expect = str(float(i)) | ||||
|             else: | ||||
|                 expect = float(i) | ||||
| 
 | ||||
|             assert expect == shml[i] | ||||
|             if frame_size == 1: | ||||
|                 val = shml[i] | ||||
|                 assert expect == val | ||||
|                 print(f'(child): reading value: {val}') | ||||
|             else: | ||||
|                 frame = shml[i - frame_size:i] | ||||
|                 print(f'(child): reading frame: {frame}') | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'use_str', [False, True], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'frame_size', | ||||
|     [1, 2**6, 2**10], | ||||
|     ids=lambda i: f'frame_size={i}', | ||||
| ) | ||||
| def test_parent_writer_child_reader( | ||||
|     use_str: bool, | ||||
|     frame_size: int, | ||||
| ): | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as an: | ||||
| 
 | ||||
|             # allocate writeable list in parent | ||||
|             key = 'shm_list' | ||||
|             shml = open_shm_list( | ||||
|                 key=key, | ||||
|                 readonly=False, | ||||
|             ) | ||||
|         async with tractor.open_nursery( | ||||
|             debug_mode=True, | ||||
|         ) as an: | ||||
| 
 | ||||
|             portal = await an.start_actor( | ||||
|                 'shm_reader', | ||||
|                 enable_modules=[__name__], | ||||
|                 debug_mode=True, | ||||
|             ) | ||||
| 
 | ||||
|             # allocate writeable list in parent | ||||
|             key = 'shm_list' | ||||
|             seq_size = int(2 * 2 ** 10) | ||||
|             shml = open_shm_list( | ||||
|                 key=key, | ||||
|                 size=seq_size, | ||||
|                 readonly=False, | ||||
|             ) | ||||
| 
 | ||||
|             async with ( | ||||
|                 portal.open_context( | ||||
|                     child_read_shm_list,  # taken from pytest parameterization | ||||
|                     child_read_shm_list, | ||||
|                     shm_key=key, | ||||
|                     use_str=use_str, | ||||
|                     frame_size=frame_size, | ||||
|                 ) as (ctx, sent), | ||||
| 
 | ||||
|                 ctx.open_stream() as stream, | ||||
|  | @ -114,14 +137,23 @@ def test_parent_writer_child_reader( | |||
| 
 | ||||
|                 assert sent == key | ||||
| 
 | ||||
|                 for i in range(2 ** 10): | ||||
|                 for i in range(seq_size): | ||||
| 
 | ||||
|                     val = float(i) | ||||
|                     if use_str: | ||||
|                         val = str(val) | ||||
| 
 | ||||
|                     print(f'writing {val}') | ||||
|                     print(f'(parent): writing {val}') | ||||
|                     shml[i] = val | ||||
| 
 | ||||
|                     # only on frame fills do we | ||||
|                     # signal to the child that a frame's | ||||
|                     # worth is ready. | ||||
|                     if (i % frame_size) == 0: | ||||
|                         print(f'(parent): signalling frame full on {val}') | ||||
|                         await stream.send(i) | ||||
|                 else: | ||||
|                     print(f'(parent): signalling final frame on {val}') | ||||
|                     await stream.send(i) | ||||
| 
 | ||||
|             await portal.cancel_actor() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue