diff --git a/tests/test_shm.py b/tests/test_shm.py index 850ccb3..c183040 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -24,7 +24,10 @@ async def child_attach_shml_alot( # now try to attach a boatload of times in a loop.. for _ in range(1000): - shml = attach_shm_list(key=shm_key) + shml = attach_shm_list( + key=shm_key, + readonly=False, + ) assert shml.shm.name == shm_key await trio.sleep(0.001) @@ -46,8 +49,8 @@ def test_child_attaches_alot(): async with ( portal.open_context( - child_attach_shml_alot, # taken from pytest parameterization - shm_key=key, + child_attach_shml_alot, + shm_key=shml.key, ) as (ctx, start_val), ): assert start_val == key @@ -63,50 +66,70 @@ async def child_read_shm_list( ctx: tractor.Context, shm_key: str, use_str: bool, + frame_size: int, ) -> None: + # attach in child shml = attach_shm_list(key=shm_key) await ctx.started(shml.key) async with ctx.open_stream() as stream: async for i in stream: - print(f'reading shm list index: {i}') + print(f'(child): reading shm list index: {i}') if use_str: expect = str(float(i)) else: expect = float(i) - assert expect == shml[i] + if frame_size == 1: + val = shml[i] + assert expect == val + print(f'(child): reading value: {val}') + else: + frame = shml[i - frame_size:i] + print(f'(child): reading frame: {frame}') @pytest.mark.parametrize( 'use_str', [False, True], ) +@pytest.mark.parametrize( + 'frame_size', + [1, 2**6, 2**10], + ids=lambda i: f'frame_size={i}', +) def test_parent_writer_child_reader( use_str: bool, + frame_size: int, ): async def main(): - async with tractor.open_nursery() as an: - - # allocate writeable list in parent - key = 'shm_list' - shml = open_shm_list( - key=key, - readonly=False, - ) + async with tractor.open_nursery( + debug_mode=True, + ) as an: portal = await an.start_actor( 'shm_reader', enable_modules=[__name__], + debug_mode=True, + ) + + # allocate writeable list in parent + key = 'shm_list' + seq_size = int(2 * 2 ** 10) + shml = open_shm_list( + key=key, + size=seq_size, + readonly=False, ) async with ( portal.open_context( - child_read_shm_list, # taken from pytest parameterization + child_read_shm_list, shm_key=key, use_str=use_str, + frame_size=frame_size, ) as (ctx, sent), ctx.open_stream() as stream, @@ -114,14 +137,23 @@ def test_parent_writer_child_reader( assert sent == key - for i in range(2 ** 10): + for i in range(seq_size): val = float(i) if use_str: val = str(val) - print(f'writing {val}') + print(f'(parent): writing {val}') shml[i] = val + + # only on frame fills do we + # signal to the child that a frame's + # worth is ready. + if (i % frame_size) == 0: + print(f'(parent): signalling frame full on {val}') + await stream.send(i) + else: + print(f'(parent): signalling final frame on {val}') await stream.send(i) await portal.cancel_actor()