diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py index 09c955ac..7d96eeda 100644 --- a/tractor/ipc/_ringbuf.py +++ b/tractor/ipc/_ringbuf.py @@ -301,6 +301,9 @@ class RingBuffReceiver(trio.abc.ReceiveStream): except EFDReadCancelled: ... + except trio.Cancelled: + ... + async def receive_some(self, max_bytes: int | None = None) -> bytes: ''' Receive up to `max_bytes`, if no `max_bytes` is provided @@ -480,16 +483,41 @@ class RingBuffBytesSender(trio.abc.SendChannel[bytes]): which decodes into a uint32 indicating the actual size of the next payload. + Optional batch mode: + + If `batch_size` > 1 messages wont get sent immediately but will be + stored until `batch_size` messages are pending, then it will send + them all at once. + + `batch_size` can be changed dynamically but always call, `flush()` + right before. + ''' def __init__( self, - sender: RingBuffSender + sender: RingBuffSender, + batch_size: int = 1 ): self._sender = sender + self.batch_size = batch_size + self._batch_msg_len = 0 + self._batch: bytes = b'' + + async def flush(self) -> None: + await self._sender.send_all(self._batch) + self._batch = b'' + self._batch_msg_len = 0 async def send(self, value: bytes) -> None: - size: bytes = struct.pack(" None: await self._sender.aclose() @@ -559,7 +587,8 @@ async def attach_to_ringbuf_rchannel( @acm async def attach_to_ringbuf_schannel( token: RBToken, - cleanup: bool = True + cleanup: bool = True, + batch_size: int = 1, ) -> AsyncContextManager[RingBuffBytesSender]: ''' Attach a RingBuffBytesSender from a previously opened @@ -568,7 +597,7 @@ async def attach_to_ringbuf_schannel( async with attach_to_ringbuf_sender( token, cleanup=cleanup ) as sender: - yield RingBuffBytesSender(sender) + yield RingBuffBytesSender(sender, batch_size=batch_size) class RingBuffChannel(trio.abc.Channel[bytes]):