From 2901049b5b3229b37f6a0de40a06be9d4509350e Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 18 Mar 2025 13:19:40 -0300 Subject: [PATCH] Improve test_ringbuf test, drop MsgTransport ring buf impl for now in favour of a trio.abc.Channel[bytes] impl, add docstrings --- tests/test_ringbuf.py | 87 ++++++++---- tractor/_testing/samples.py | 46 +++++- tractor/ipc/__init__.py | 6 +- tractor/ipc/_ringbuf.py | 269 ++++++++++++++---------------------- 4 files changed, 210 insertions(+), 198 deletions(-) diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index 3aa32cf8..f987d4c8 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -1,4 +1,5 @@ import time +import hashlib import trio import pytest @@ -7,8 +8,8 @@ from tractor.ipc import ( open_ringbuf, attach_to_ringbuf_receiver, attach_to_ringbuf_sender, - attach_to_ringbuf_pair, attach_to_ringbuf_stream, + attach_to_ringbuf_channel, RBToken, ) from tractor._testing.samples import ( @@ -22,12 +23,26 @@ async def child_read_shm( ctx: tractor.Context, msg_amount: int, token: RBToken, -) -> None: - recvd_bytes = 0 +) -> str: + ''' + Sub-actor used in `test_ringbuf`. + + Attach to a ringbuf and receive all messages until end of stream. + Keep track of how many bytes received and also calculate + sha256 of the whole byte stream. + + Calculate and print performance stats, finally return calculated + hash. + + ''' await ctx.started() + print('reader started') + recvd_bytes = 0 + recvd_hash = hashlib.sha256() start_ts = time.time() async with attach_to_ringbuf_receiver(token) as receiver: async for msg in receiver: + recvd_hash.update(msg) recvd_bytes += len(msg) end_ts = time.time() @@ -37,7 +52,9 @@ async def child_read_shm( print(f'\n\telapsed ms: {elapsed_ms}') print(f'\tmsg/sec: {int(msg_amount / elapsed):,}') print(f'\tbytes/sec: {int(recvd_bytes / elapsed):,}') - print(f'\treceived bytes: {recvd_bytes}') + print(f'\treceived bytes: {recvd_bytes:,}') + + return recvd_hash.hexdigest() @tractor.context @@ -48,12 +65,26 @@ async def child_write_shm( rand_max: int, token: RBToken, ) -> None: - msgs, total_bytes = generate_sample_messages( + ''' + Sub-actor used in `test_ringbuf` + + Generate `msg_amount` payloads with + `random.randint(rand_min, rand_max)` random bytes at the end, + Calculate sha256 hash and send it to parent on `ctx.started`. + + Attach to ringbuf and send all generated messages. + + ''' + msgs, _total_bytes = generate_sample_messages( msg_amount, rand_min=rand_min, rand_max=rand_max, ) - await ctx.started(total_bytes) + print('writer hashing payload...') + sent_hash = hashlib.sha256(b''.join(msgs)).hexdigest() + print('writer done hashing.') + await ctx.started(sent_hash) + print('writer started') async with attach_to_ringbuf_sender(token, cleanup=False) as sender: for msg in msgs: await sender.send_all(msg) @@ -87,11 +118,12 @@ def test_ringbuf( ): ''' - Open a new ring buf on root actor - - Create a sender subactor and generate {msg_amount} messages - optionally with a random amount of bytes at the end of each, - return total_bytes on `ctx.started`, then send all messages - - Create a receiver subactor and receive until total_bytes are - read, print simple perf stats. + - Open `child_write_shm` ctx in sub-actor which will generate a + random payload and send its hash on `ctx.started`, finally sending + the payload through the stream. + - Open `child_read_shm` ctx in sub-actor which will receive the + payload, calculate perf stats and return the hash. + - Compare both hashes ''' async def main(): @@ -119,14 +151,16 @@ def test_ringbuf( msg_amount=msg_amount, rand_min=rand_min, rand_max=rand_max, - ) as (sctx, total_bytes), + ) as (_sctx, sent_hash), recv_p.open_context( child_read_shm, token=token, msg_amount=msg_amount - ) as (sctx, _sent), + ) as (rctx, _sent), ): - await recv_p.result() + recvd_hash = await rctx.result() + + assert sent_hash == recvd_hash await send_p.cancel_actor() await recv_p.cancel_actor() @@ -274,7 +308,7 @@ def test_stapled_ringbuf(): pair_1_done = trio.Event() async def pair_0(token_in: RBToken, token_out: RBToken): - async with attach_to_ringbuf_pair( + async with attach_to_ringbuf_stream( token_in, token_out, cleanup_in=False, @@ -293,7 +327,7 @@ def test_stapled_ringbuf(): async def pair_1(token_in: RBToken, token_out: RBToken): - async with attach_to_ringbuf_pair( + async with attach_to_ringbuf_stream( token_in, token_out, cleanup_in=False, @@ -327,7 +361,7 @@ def test_stapled_ringbuf(): @tractor.context -async def child_transport_sender( +async def child_channel_sender( ctx: tractor.Context, msg_amount_min: int, msg_amount_max: int, @@ -340,19 +374,17 @@ async def child_transport_sender( rand_min=256, rand_max=1024, ) - async with attach_to_ringbuf_stream( + async with attach_to_ringbuf_channel( token_in, token_out - ) as transport: + ) as chan: await ctx.started(msgs) for msg in msgs: - await transport.send(msg) - - await transport.recv() + await chan.send(msg) -def test_ringbuf_transport(): +def test_ringbuf_channel(): msg_amount_min = 100 msg_amount_max = 1000 @@ -362,7 +394,7 @@ def test_ringbuf_transport(): 'test_ringbuf_transport' ) as (token_0, token_1): async with ( - attach_to_ringbuf_stream(token_0, token_1) as transport, + attach_to_ringbuf_channel(token_0, token_1) as chan, tractor.open_nursery() as an ): recv_p = await an.start_actor( @@ -374,7 +406,7 @@ def test_ringbuf_transport(): ) async with ( recv_p.open_context( - child_transport_sender, + child_channel_sender, msg_amount_min=msg_amount_min, msg_amount_max=msg_amount_max, token_in=token_1, @@ -382,10 +414,9 @@ def test_ringbuf_transport(): ) as (ctx, msgs), ): recv_msgs = [] - while len(recv_msgs) < len(msgs): - recv_msgs.append(await transport.recv()) + async for msg in chan: + recv_msgs.append(msg) - await transport.send(b'end') await recv_p.cancel_actor() assert recv_msgs == msgs diff --git a/tractor/_testing/samples.py b/tractor/_testing/samples.py index 1454ee3d..4249bae9 100644 --- a/tractor/_testing/samples.py +++ b/tractor/_testing/samples.py @@ -3,6 +3,18 @@ import random def generate_single_byte_msgs(amount: int) -> bytes: + ''' + Generate a byte instance of len `amount` with: + + ``` + byte_at_index(i) = (i % 10).encode() + ``` + + this results in constantly repeating sequences of: + + b'0123456789' + + ''' return b''.join(str(i % 10).encode() for i in range(amount)) @@ -10,15 +22,39 @@ def generate_sample_messages( amount: int, rand_min: int = 0, rand_max: int = 0, - silent: bool = False + silent: bool = False, ) -> tuple[list[bytes], int]: + ''' + Generate bytes msgs for tests. + Messages will have the following format: + + ``` + b'[{i:08}]' + os.urandom(random.randint(rand_min, rand_max)) + ``` + + so for message index 25: + + b'[00000025]' + random_bytes + + ''' msgs = [] size = 0 + log_interval = None if not silent: print(f'\ngenerating {amount} messages...') + # calculate an apropiate log interval based on + # max message size + max_msg_size = 10 + rand_max + + if max_msg_size <= 32 * 1024: + log_interval = 10_000 + + else: + log_interval = 1000 + for i in range(amount): msg = f'[{i:08}]'.encode('utf-8') @@ -30,7 +66,13 @@ def generate_sample_messages( msgs.append(msg) - if not silent and i and i % 10_000 == 0: + if ( + not silent + and + i > 0 + and + i % log_interval == 0 + ): print(f'{i} generated') if not silent: diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index 329dca1e..689fc44b 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -51,7 +51,9 @@ if platform.system() == 'Linux': open_ringbuf_pair as open_ringbuf_pair, attach_to_ringbuf_receiver as attach_to_ringbuf_receiver, attach_to_ringbuf_sender as attach_to_ringbuf_sender, - attach_to_ringbuf_pair as attach_to_ringbuf_pair, attach_to_ringbuf_stream as attach_to_ringbuf_stream, - MsgpackRBStream as MsgpackRBStream + RingBuffBytesSender as RingBuffBytesSender, + RingBuffBytesReceiver as RingBuffBytesReceiver, + RingBuffChannel as RingBuffChannel, + attach_to_ringbuf_channel as attach_to_ringbuf_channel ) diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py index 7529c942..038d9e73 100644 --- a/tractor/ipc/_ringbuf.py +++ b/tractor/ipc/_ringbuf.py @@ -19,17 +19,10 @@ IPC Reliable RingBuffer implementation ''' from __future__ import annotations import struct -from collections.abc import ( - AsyncGenerator, - AsyncIterator -) from contextlib import ( contextmanager as cm, asynccontextmanager as acm ) -from typing import ( - Any -) from multiprocessing.shared_memory import SharedMemory import trio @@ -48,10 +41,8 @@ from ._linux import ( from ._mp_bs import disable_mantracker from tractor.log import get_logger from tractor._exceptions import ( - TransportClosed, InternalError ) -from tractor.ipc import MsgTransport log = get_logger(__name__) @@ -147,6 +138,7 @@ def open_ringbuf( Buffer = bytes | bytearray | memoryview + ''' IPC Reliable Ring Buffer @@ -406,7 +398,7 @@ async def attach_to_ringbuf_receiver( cleanup: bool = True ): ''' - Instantiate a RingBuffReceiver from a previously opened + Attach a RingBuffReceiver from a previously opened RBToken. Launches `receiver._eof_monitor_task` in a `trio.Nursery`. @@ -421,13 +413,14 @@ async def attach_to_ringbuf_receiver( n.start_soon(receiver._eof_monitor_task) yield receiver + @acm async def attach_to_ringbuf_sender( token: RBToken, cleanup: bool = True ): ''' - Instantiate a RingBuffSender from a previously opened + Attach a RingBuffSender from a previously opened RBToken. ''' @@ -463,14 +456,14 @@ def open_ringbuf_pair( @acm -async def attach_to_ringbuf_pair( +async def attach_to_ringbuf_stream( token_in: RBToken, token_out: RBToken, cleanup_in: bool = True, cleanup_out: bool = True ): ''' - Instantiate a trio.StapledStream from a previously opened + Attach a trio.StapledStream from a previously opened ringbuf pair. ''' @@ -487,180 +480,124 @@ async def attach_to_ringbuf_pair( yield trio.StapledStream(sender, receiver) -class MsgpackRBStream(MsgTransport): +class RingBuffBytesSender(trio.abc.SendChannel[bytes]): + ''' + In order to guarantee full messages are received, all bytes + sent by `RingBuffBytesSender` are preceded with a 4 byte header + which decodes into a uint32 indicating the actual size of the + next payload. + + ''' def __init__( self, - stream: trio.StapledStream + sender: RingBuffSender ): - self.stream = stream - - # create read loop intance - self._aiter_pkts = self._iter_packets() + self._sender = sender self._send_lock = trio.StrictFIFOLock() - self.drained: list[dict] = [] - - self.recv_stream = BufferedReceiveStream( - transport_stream=stream - ) - - async def _iter_packets(self) -> AsyncGenerator[dict, None]: - ''' - Yield `bytes`-blob decoded packets from the underlying TCP - stream using the current task's `MsgCodec`. - - This is a streaming routine implemented as an async generator - func (which was the original design, but could be changed?) - and is allocated by a `.__call__()` inside `.__init__()` where - it is assigned to the `._aiter_pkts` attr. - - ''' - - while True: - try: - header: bytes = await self.recv_stream.receive_exactly(4) - except ( - ValueError, - ConnectionResetError, - - # not sure entirely why we need this but without it we - # seem to be getting racy failures here on - # arbiter/registry name subs.. - trio.BrokenResourceError, - - ) as trans_err: - - loglevel = 'transport' - match trans_err: - # case ( - # ConnectionResetError() - # ): - # loglevel = 'transport' - - # peer actor (graceful??) TCP EOF but `tricycle` - # seems to raise a 0-bytes-read? - case ValueError() if ( - 'unclean EOF' in trans_err.args[0] - ): - pass - - # peer actor (task) prolly shutdown quickly due - # to cancellation - case trio.BrokenResourceError() if ( - 'Connection reset by peer' in trans_err.args[0] - ): - pass - - # unless the disconnect condition falls under "a - # normal operation breakage" we usualy console warn - # about it. - case _: - loglevel: str = 'warning' - - - raise TransportClosed( - message=( - f'IPC transport already closed by peer\n' - f'x)> {type(trans_err)}\n' - f' |_{self}\n' - ), - loglevel=loglevel, - ) from trans_err - - # XXX definitely can happen if transport is closed - # manually by another `trio.lowlevel.Task` in the - # same actor; we use this in some simulated fault - # testing for ex, but generally should never happen - # under normal operation! - # - # NOTE: as such we always re-raise this error from the - # RPC msg loop! - except trio.ClosedResourceError as closure_err: - raise TransportClosed( - message=( - f'IPC transport already manually closed locally?\n' - f'x)> {type(closure_err)} \n' - f' |_{self}\n' - ), - loglevel='error', - raise_on_report=( - closure_err.args[0] == 'another task closed this fd' - or - closure_err.args[0] in ['another task closed this fd'] - ), - ) from closure_err - - # graceful EOF disconnect - if header == b'': - raise TransportClosed( - message=( - f'IPC transport already gracefully closed\n' - f')>\n' - f'|_{self}\n' - ), - loglevel='transport', - # cause=??? # handy or no? - ) - - size: int - size, = struct.unpack(" None: - ''' - Send a msgpack encoded py-object-blob-as-msg. - - ''' + async def send(self, value: bytes) -> None: async with self._send_lock: - size: bytes = struct.pack(" Any: - return await self._aiter_pkts.asend(None) + async def aclose(self) -> None: + async with self._send_lock: + await self._sender.aclose() - async def drain(self) -> AsyncIterator[dict]: + +class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]): + ''' + See `RingBuffBytesSender` docstring. + + A `tricycle.BufferedReceiveStream` is used for the + `receive_exactly` API. + ''' + def __init__( + self, + receiver: RingBuffReceiver + ): + self._receiver = receiver + + async def _receive_exactly(self, num_bytes: int) -> bytes: ''' - Drain the stream's remaining messages sent from - the far end until the connection is closed by - the peer. + Fetch bytes from receiver until we read exactly `num_bytes` + or end of stream is signaled. ''' - try: - async for msg in self._iter_packets(): - self.drained.append(msg) - except TransportClosed: - for msg in self.drained: - yield msg + payload = b'' + while len(payload) < num_bytes: + remaining = num_bytes - len(payload) - def __aiter__(self): - return self._aiter_pkts + new_bytes = await self._receiver.receive_some( + max_bytes=remaining + ) + + if new_bytes == b'': + raise trio.EndOfChannel + + payload += new_bytes + + return payload + + async def receive(self) -> bytes: + header: bytes = await self._receive_exactly(4) + size: int + size, = struct.unpack(" None: + await self._receiver.aclose() + + +class RingBuffChannel(trio.abc.Channel[bytes]): + ''' + Combine `RingBuffBytesSender` and `RingBuffBytesReceiver` + in order to expose the bidirectional `trio.abc.Channel` API. + + ''' + def __init__( + self, + sender: RingBuffBytesSender, + receiver: RingBuffBytesReceiver + ): + self._sender = sender + self._receiver = receiver + + async def send(self, value: bytes): + await self._sender.send(value) + + async def receive(self) -> bytes: + return await self._receiver.receive() + + async def aclose(self): + await self._receiver.aclose() + await self._sender.aclose() @acm -async def attach_to_ringbuf_stream( +async def attach_to_ringbuf_channel( token_in: RBToken, token_out: RBToken, cleanup_in: bool = True, cleanup_out: bool = True ): ''' - Wrap a ringbuf trio.StapledStream in a MsgpackRBStream + Attach to an already opened ringbuf pair and return + a `RingBuffChannel`. ''' - async with attach_to_ringbuf_pair( - token_in, - token_out, - cleanup_in=cleanup_in, - cleanup_out=cleanup_out - ) as stream: - yield MsgpackRBStream(stream) + async with ( + attach_to_ringbuf_receiver( + token_in, + cleanup=cleanup_in + ) as receiver, + attach_to_ringbuf_sender( + token_out, + cleanup=cleanup_out + ) as sender, + ): + yield RingBuffChannel( + RingBuffBytesSender(sender), + RingBuffBytesReceiver(receiver) + )