forked from goodboy/tractor
1
0
Fork 0

Parametrize rw test with variable frame sizes

Demonstrates fixed size frame-oriented reads by the child where the
parent only transmits a "read" stream msg on "frame fill events" such
that the child incrementally reads the shm list data (much like in
a real-time-buffered streaming system).
shm_apis
Tyler Goodlet 2022-10-18 11:01:30 -04:00
parent b52ff270c5
commit a9fc4c1b91
1 changed files with 48 additions and 16 deletions

View File

@ -24,7 +24,10 @@ async def child_attach_shml_alot(
# now try to attach a boatload of times in a loop.. # now try to attach a boatload of times in a loop..
for _ in range(1000): for _ in range(1000):
shml = attach_shm_list(key=shm_key) shml = attach_shm_list(
key=shm_key,
readonly=False,
)
assert shml.shm.name == shm_key assert shml.shm.name == shm_key
await trio.sleep(0.001) await trio.sleep(0.001)
@ -46,8 +49,8 @@ def test_child_attaches_alot():
async with ( async with (
portal.open_context( portal.open_context(
child_attach_shml_alot, # taken from pytest parameterization child_attach_shml_alot,
shm_key=key, shm_key=shml.key,
) as (ctx, start_val), ) as (ctx, start_val),
): ):
assert start_val == key assert start_val == key
@ -63,50 +66,70 @@ async def child_read_shm_list(
ctx: tractor.Context, ctx: tractor.Context,
shm_key: str, shm_key: str,
use_str: bool, use_str: bool,
frame_size: int,
) -> None: ) -> None:
# attach in child
shml = attach_shm_list(key=shm_key) shml = attach_shm_list(key=shm_key)
await ctx.started(shml.key) await ctx.started(shml.key)
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for i in stream: async for i in stream:
print(f'reading shm list index: {i}') print(f'(child): reading shm list index: {i}')
if use_str: if use_str:
expect = str(float(i)) expect = str(float(i))
else: else:
expect = float(i) expect = float(i)
assert expect == shml[i] if frame_size == 1:
val = shml[i]
assert expect == val
print(f'(child): reading value: {val}')
else:
frame = shml[i - frame_size:i]
print(f'(child): reading frame: {frame}')
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_str', [False, True], 'use_str', [False, True],
) )
@pytest.mark.parametrize(
'frame_size',
[1, 2**6, 2**10],
ids=lambda i: f'frame_size={i}',
)
def test_parent_writer_child_reader( def test_parent_writer_child_reader(
use_str: bool, use_str: bool,
frame_size: int,
): ):
async def main(): async def main():
async with tractor.open_nursery() as an: async with tractor.open_nursery(
debug_mode=True,
# allocate writeable list in parent ) as an:
key = 'shm_list'
shml = open_shm_list(
key=key,
readonly=False,
)
portal = await an.start_actor( portal = await an.start_actor(
'shm_reader', 'shm_reader',
enable_modules=[__name__], enable_modules=[__name__],
debug_mode=True,
)
# allocate writeable list in parent
key = 'shm_list'
seq_size = int(2 * 2 ** 10)
shml = open_shm_list(
key=key,
size=seq_size,
readonly=False,
) )
async with ( async with (
portal.open_context( portal.open_context(
child_read_shm_list, # taken from pytest parameterization child_read_shm_list,
shm_key=key, shm_key=key,
use_str=use_str, use_str=use_str,
frame_size=frame_size,
) as (ctx, sent), ) as (ctx, sent),
ctx.open_stream() as stream, ctx.open_stream() as stream,
@ -114,14 +137,23 @@ def test_parent_writer_child_reader(
assert sent == key assert sent == key
for i in range(2 ** 10): for i in range(seq_size):
val = float(i) val = float(i)
if use_str: if use_str:
val = str(val) val = str(val)
print(f'writing {val}') print(f'(parent): writing {val}')
shml[i] = val shml[i] = val
# only on frame fills do we
# signal to the child that a frame's
# worth is ready.
if (i % frame_size) == 0:
print(f'(parent): signalling frame full on {val}')
await stream.send(i)
else:
print(f'(parent): signalling final frame on {val}')
await stream.send(i) await stream.send(i)
await portal.cancel_actor() await portal.cancel_actor()