Parametrize rw test with variable frame sizes
Demonstrates fixed size frame-oriented reads by the child where the parent only transmits a "read" stream msg on "frame fill events" such that the child incrementally reads the shm list data (much like in a real-time-buffered streaming system).
							parent
							
								
									e964a33ba8
								
							
						
					
					
						commit
						a57b335517
					
				|  | @ -24,7 +24,10 @@ async def child_attach_shml_alot( | ||||||
| 
 | 
 | ||||||
|     # now try to attach a boatload of times in a loop.. |     # now try to attach a boatload of times in a loop.. | ||||||
|     for _ in range(1000): |     for _ in range(1000): | ||||||
|         shml = attach_shm_list(key=shm_key) |         shml = attach_shm_list( | ||||||
|  |             key=shm_key, | ||||||
|  |             readonly=False, | ||||||
|  |         ) | ||||||
|         assert shml.shm.name == shm_key |         assert shml.shm.name == shm_key | ||||||
|         await trio.sleep(0.001) |         await trio.sleep(0.001) | ||||||
| 
 | 
 | ||||||
|  | @ -46,8 +49,8 @@ def test_child_attaches_alot(): | ||||||
| 
 | 
 | ||||||
|             async with ( |             async with ( | ||||||
|                 portal.open_context( |                 portal.open_context( | ||||||
|                     child_attach_shml_alot,  # taken from pytest parameterization |                     child_attach_shml_alot, | ||||||
|                     shm_key=key, |                     shm_key=shml.key, | ||||||
|                 ) as (ctx, start_val), |                 ) as (ctx, start_val), | ||||||
|             ): |             ): | ||||||
|                 assert start_val == key |                 assert start_val == key | ||||||
|  | @ -63,50 +66,70 @@ async def child_read_shm_list( | ||||||
|     ctx: tractor.Context, |     ctx: tractor.Context, | ||||||
|     shm_key: str, |     shm_key: str, | ||||||
|     use_str: bool, |     use_str: bool, | ||||||
|  |     frame_size: int, | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|  |     # attach in child | ||||||
|     shml = attach_shm_list(key=shm_key) |     shml = attach_shm_list(key=shm_key) | ||||||
|     await ctx.started(shml.key) |     await ctx.started(shml.key) | ||||||
| 
 | 
 | ||||||
|     async with ctx.open_stream() as stream: |     async with ctx.open_stream() as stream: | ||||||
|         async for i in stream: |         async for i in stream: | ||||||
|             print(f'reading shm list index: {i}') |             print(f'(child): reading shm list index: {i}') | ||||||
| 
 | 
 | ||||||
|             if use_str: |             if use_str: | ||||||
|                 expect = str(float(i)) |                 expect = str(float(i)) | ||||||
|             else: |             else: | ||||||
|                 expect = float(i) |                 expect = float(i) | ||||||
| 
 | 
 | ||||||
|             assert expect == shml[i] |             if frame_size == 1: | ||||||
|  |                 val = shml[i] | ||||||
|  |                 assert expect == val | ||||||
|  |                 print(f'(child): reading value: {val}') | ||||||
|  |             else: | ||||||
|  |                 frame = shml[i - frame_size:i] | ||||||
|  |                 print(f'(child): reading frame: {frame}') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||||
|     'use_str', [False, True], |     'use_str', [False, True], | ||||||
| ) | ) | ||||||
|  | @pytest.mark.parametrize( | ||||||
|  |     'frame_size', | ||||||
|  |     [1, 2**6, 2**10], | ||||||
|  |     ids=lambda i: f'frame_size={i}', | ||||||
|  | ) | ||||||
| def test_parent_writer_child_reader( | def test_parent_writer_child_reader( | ||||||
|     use_str: bool, |     use_str: bool, | ||||||
|  |     frame_size: int, | ||||||
| ): | ): | ||||||
| 
 | 
 | ||||||
|     async def main(): |     async def main(): | ||||||
|         async with tractor.open_nursery() as an: |         async with tractor.open_nursery( | ||||||
| 
 |             debug_mode=True, | ||||||
|             # allocate writeable list in parent |         ) as an: | ||||||
|             key = 'shm_list' |  | ||||||
|             shml = open_shm_list( |  | ||||||
|                 key=key, |  | ||||||
|                 readonly=False, |  | ||||||
|             ) |  | ||||||
| 
 | 
 | ||||||
|             portal = await an.start_actor( |             portal = await an.start_actor( | ||||||
|                 'shm_reader', |                 'shm_reader', | ||||||
|                 enable_modules=[__name__], |                 enable_modules=[__name__], | ||||||
|  |                 debug_mode=True, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             # allocate writeable list in parent | ||||||
|  |             key = 'shm_list' | ||||||
|  |             seq_size = int(2 * 2 ** 10) | ||||||
|  |             shml = open_shm_list( | ||||||
|  |                 key=key, | ||||||
|  |                 size=seq_size, | ||||||
|  |                 readonly=False, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             async with ( |             async with ( | ||||||
|                 portal.open_context( |                 portal.open_context( | ||||||
|                     child_read_shm_list,  # taken from pytest parameterization |                     child_read_shm_list, | ||||||
|                     shm_key=key, |                     shm_key=key, | ||||||
|                     use_str=use_str, |                     use_str=use_str, | ||||||
|  |                     frame_size=frame_size, | ||||||
|                 ) as (ctx, sent), |                 ) as (ctx, sent), | ||||||
| 
 | 
 | ||||||
|                 ctx.open_stream() as stream, |                 ctx.open_stream() as stream, | ||||||
|  | @ -114,14 +137,23 @@ def test_parent_writer_child_reader( | ||||||
| 
 | 
 | ||||||
|                 assert sent == key |                 assert sent == key | ||||||
| 
 | 
 | ||||||
|                 for i in range(2 ** 10): |                 for i in range(seq_size): | ||||||
| 
 | 
 | ||||||
|                     val = float(i) |                     val = float(i) | ||||||
|                     if use_str: |                     if use_str: | ||||||
|                         val = str(val) |                         val = str(val) | ||||||
| 
 | 
 | ||||||
|                     print(f'writing {val}') |                     print(f'(parent): writing {val}') | ||||||
|                     shml[i] = val |                     shml[i] = val | ||||||
|  | 
 | ||||||
|  |                     # only on frame fills do we | ||||||
|  |                     # signal to the child that a frame's | ||||||
|  |                     # worth is ready. | ||||||
|  |                     if (i % frame_size) == 0: | ||||||
|  |                         print(f'(parent): signalling frame full on {val}') | ||||||
|  |                         await stream.send(i) | ||||||
|  |                 else: | ||||||
|  |                     print(f'(parent): signalling final frame on {val}') | ||||||
|                     await stream.send(i) |                     await stream.send(i) | ||||||
| 
 | 
 | ||||||
|             await portal.cancel_actor() |             await portal.cancel_actor() | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue