Parametrize rw test with variable frame sizes
Demonstrates fixed size frame-oriented reads by the child where the parent only transmits a "read" stream msg on "frame fill events" such that the child incrementally reads the shm list data (much like in a real-time-buffered streaming system).macos_in_ci
							parent
							
								
									2ac19b2e96
								
							
						
					
					
						commit
						3bdd04ef4a
					
				| 
						 | 
				
			
			@ -24,7 +24,10 @@ async def child_attach_shml_alot(
 | 
			
		|||
 | 
			
		||||
    # now try to attach a boatload of times in a loop..
 | 
			
		||||
    for _ in range(1000):
 | 
			
		||||
        shml = attach_shm_list(key=shm_key)
 | 
			
		||||
        shml = attach_shm_list(
 | 
			
		||||
            key=shm_key,
 | 
			
		||||
            readonly=False,
 | 
			
		||||
        )
 | 
			
		||||
        assert shml.shm.name == shm_key
 | 
			
		||||
        await trio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -46,8 +49,8 @@ def test_child_attaches_alot():
 | 
			
		|||
 | 
			
		||||
            async with (
 | 
			
		||||
                portal.open_context(
 | 
			
		||||
                    child_attach_shml_alot,  # taken from pytest parameterization
 | 
			
		||||
                    shm_key=key,
 | 
			
		||||
                    child_attach_shml_alot,
 | 
			
		||||
                    shm_key=shml.key,
 | 
			
		||||
                ) as (ctx, start_val),
 | 
			
		||||
            ):
 | 
			
		||||
                assert start_val == key
 | 
			
		||||
| 
						 | 
				
			
			@ -63,50 +66,70 @@ async def child_read_shm_list(
 | 
			
		|||
    ctx: tractor.Context,
 | 
			
		||||
    shm_key: str,
 | 
			
		||||
    use_str: bool,
 | 
			
		||||
    frame_size: int,
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    # attach in child
 | 
			
		||||
    shml = attach_shm_list(key=shm_key)
 | 
			
		||||
    await ctx.started(shml.key)
 | 
			
		||||
 | 
			
		||||
    async with ctx.open_stream() as stream:
 | 
			
		||||
        async for i in stream:
 | 
			
		||||
            print(f'reading shm list index: {i}')
 | 
			
		||||
            print(f'(child): reading shm list index: {i}')
 | 
			
		||||
 | 
			
		||||
            if use_str:
 | 
			
		||||
                expect = str(float(i))
 | 
			
		||||
            else:
 | 
			
		||||
                expect = float(i)
 | 
			
		||||
 | 
			
		||||
            assert expect == shml[i]
 | 
			
		||||
            if frame_size == 1:
 | 
			
		||||
                val = shml[i]
 | 
			
		||||
                assert expect == val
 | 
			
		||||
                print(f'(child): reading value: {val}')
 | 
			
		||||
            else:
 | 
			
		||||
                frame = shml[i - frame_size:i]
 | 
			
		||||
                print(f'(child): reading frame: {frame}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'use_str', [False, True],
 | 
			
		||||
)
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'frame_size',
 | 
			
		||||
    [1, 2**6, 2**10],
 | 
			
		||||
    ids=lambda i: f'frame_size={i}',
 | 
			
		||||
)
 | 
			
		||||
def test_parent_writer_child_reader(
 | 
			
		||||
    use_str: bool,
 | 
			
		||||
    frame_size: int,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
 | 
			
		||||
            # allocate writeable list in parent
 | 
			
		||||
            key = 'shm_list'
 | 
			
		||||
            shml = open_shm_list(
 | 
			
		||||
                key=key,
 | 
			
		||||
                readonly=False,
 | 
			
		||||
            )
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            debug_mode=True,
 | 
			
		||||
        ) as an:
 | 
			
		||||
 | 
			
		||||
            portal = await an.start_actor(
 | 
			
		||||
                'shm_reader',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
                debug_mode=True,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # allocate writeable list in parent
 | 
			
		||||
            key = 'shm_list'
 | 
			
		||||
            seq_size = int(2 * 2 ** 10)
 | 
			
		||||
            shml = open_shm_list(
 | 
			
		||||
                key=key,
 | 
			
		||||
                size=seq_size,
 | 
			
		||||
                readonly=False,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                portal.open_context(
 | 
			
		||||
                    child_read_shm_list,  # taken from pytest parameterization
 | 
			
		||||
                    child_read_shm_list,
 | 
			
		||||
                    shm_key=key,
 | 
			
		||||
                    use_str=use_str,
 | 
			
		||||
                    frame_size=frame_size,
 | 
			
		||||
                ) as (ctx, sent),
 | 
			
		||||
 | 
			
		||||
                ctx.open_stream() as stream,
 | 
			
		||||
| 
						 | 
				
			
			@ -114,14 +137,23 @@ def test_parent_writer_child_reader(
 | 
			
		|||
 | 
			
		||||
                assert sent == key
 | 
			
		||||
 | 
			
		||||
                for i in range(2 ** 10):
 | 
			
		||||
                for i in range(seq_size):
 | 
			
		||||
 | 
			
		||||
                    val = float(i)
 | 
			
		||||
                    if use_str:
 | 
			
		||||
                        val = str(val)
 | 
			
		||||
 | 
			
		||||
                    print(f'writing {val}')
 | 
			
		||||
                    print(f'(parent): writing {val}')
 | 
			
		||||
                    shml[i] = val
 | 
			
		||||
 | 
			
		||||
                    # only on frame fills do we
 | 
			
		||||
                    # signal to the child that a frame's
 | 
			
		||||
                    # worth is ready.
 | 
			
		||||
                    if (i % frame_size) == 0:
 | 
			
		||||
                        print(f'(parent): signalling frame full on {val}')
 | 
			
		||||
                        await stream.send(i)
 | 
			
		||||
                else:
 | 
			
		||||
                    print(f'(parent): signalling final frame on {val}')
 | 
			
		||||
                    await stream.send(i)
 | 
			
		||||
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue