Catch trio cancellation on RingBuffReceiver bg eof listener task, add batched mode to RingBuffBytesSender
							parent
							
								
									ea010ab46a
								
							
						
					
					
						commit
						010874bed5
					
				|  | @ -301,6 +301,9 @@ class RingBuffReceiver(trio.abc.ReceiveStream): | |||
|         except EFDReadCancelled: | ||||
|             ... | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             ... | ||||
| 
 | ||||
|     async def receive_some(self, max_bytes: int | None = None) -> bytes: | ||||
|         ''' | ||||
|         Receive up to `max_bytes`, if no `max_bytes` is provided | ||||
|  | @ -480,16 +483,41 @@ class RingBuffBytesSender(trio.abc.SendChannel[bytes]): | |||
|     which decodes into a uint32 indicating the actual size of the | ||||
|     next payload. | ||||
| 
 | ||||
|     Optional batch mode: | ||||
| 
 | ||||
|     If `batch_size` > 1 messages wont get sent immediately but will be | ||||
|     stored until `batch_size` messages are pending, then it will send | ||||
|     them all at once. | ||||
| 
 | ||||
|     `batch_size` can be changed dynamically but always call, `flush()` | ||||
|     right before. | ||||
| 
 | ||||
|     ''' | ||||
|     def __init__( | ||||
|         self, | ||||
|         sender: RingBuffSender | ||||
|         sender: RingBuffSender, | ||||
|         batch_size: int = 1 | ||||
|     ): | ||||
|         self._sender = sender | ||||
|         self.batch_size = batch_size | ||||
|         self._batch_msg_len = 0 | ||||
|         self._batch: bytes = b'' | ||||
| 
 | ||||
|     async def flush(self) -> None: | ||||
|         await self._sender.send_all(self._batch) | ||||
|         self._batch = b'' | ||||
|         self._batch_msg_len = 0 | ||||
| 
 | ||||
|     async def send(self, value: bytes) -> None: | ||||
|         size: bytes = struct.pack("<I", len(value)) | ||||
|         return await self._sender.send_all(size + value) | ||||
|         msg: bytes = struct.pack("<I", len(value)) + value | ||||
|         if self.batch_size == 1: | ||||
|             await self._sender.send_all(msg) | ||||
| 
 | ||||
|         self._batch += msg | ||||
|         self._batch_msg_len += 1 | ||||
|         if self._batch_msg_len == self.batch_size: | ||||
|             await self.flush() | ||||
| 
 | ||||
| 
 | ||||
|     async def aclose(self) -> None: | ||||
|         await self._sender.aclose() | ||||
|  | @ -559,7 +587,8 @@ async def attach_to_ringbuf_rchannel( | |||
| @acm | ||||
| async def attach_to_ringbuf_schannel( | ||||
|     token: RBToken, | ||||
|     cleanup: bool = True | ||||
|     cleanup: bool = True, | ||||
|     batch_size: int = 1, | ||||
| ) -> AsyncContextManager[RingBuffBytesSender]: | ||||
|     ''' | ||||
|     Attach a RingBuffBytesSender from a previously opened | ||||
|  | @ -568,7 +597,7 @@ async def attach_to_ringbuf_schannel( | |||
|     async with attach_to_ringbuf_sender( | ||||
|         token, cleanup=cleanup | ||||
|     ) as sender: | ||||
|         yield RingBuffBytesSender(sender) | ||||
|         yield RingBuffBytesSender(sender, batch_size=batch_size) | ||||
| 
 | ||||
| 
 | ||||
| class RingBuffChannel(trio.abc.Channel[bytes]): | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue