Catch trio cancellation on RingBuffReceiver bg eof listener task, add batched mode to RingBuffBytesSender
parent
1bbf1f7ab5
commit
6e113d2150
|
@ -301,6 +301,9 @@ class RingBuffReceiver(trio.abc.ReceiveStream):
|
||||||
except EFDReadCancelled:
|
except EFDReadCancelled:
|
||||||
...
|
...
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
...
|
||||||
|
|
||||||
async def receive_some(self, max_bytes: int | None = None) -> bytes:
|
async def receive_some(self, max_bytes: int | None = None) -> bytes:
|
||||||
'''
|
'''
|
||||||
Receive up to `max_bytes`, if no `max_bytes` is provided
|
Receive up to `max_bytes`, if no `max_bytes` is provided
|
||||||
|
@ -480,16 +483,41 @@ class RingBuffBytesSender(trio.abc.SendChannel[bytes]):
|
||||||
which decodes into a uint32 indicating the actual size of the
|
which decodes into a uint32 indicating the actual size of the
|
||||||
next payload.
|
next payload.
|
||||||
|
|
||||||
|
Optional batch mode:
|
||||||
|
|
||||||
|
If `batch_size` > 1 messages wont get sent immediately but will be
|
||||||
|
stored until `batch_size` messages are pending, then it will send
|
||||||
|
them all at once.
|
||||||
|
|
||||||
|
`batch_size` can be changed dynamically but always call, `flush()`
|
||||||
|
right before.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
sender: RingBuffSender
|
sender: RingBuffSender,
|
||||||
|
batch_size: int = 1
|
||||||
):
|
):
|
||||||
self._sender = sender
|
self._sender = sender
|
||||||
|
self.batch_size = batch_size
|
||||||
|
self._batch_msg_len = 0
|
||||||
|
self._batch: bytes = b''
|
||||||
|
|
||||||
|
async def flush(self) -> None:
|
||||||
|
await self._sender.send_all(self._batch)
|
||||||
|
self._batch = b''
|
||||||
|
self._batch_msg_len = 0
|
||||||
|
|
||||||
async def send(self, value: bytes) -> None:
|
async def send(self, value: bytes) -> None:
|
||||||
size: bytes = struct.pack("<I", len(value))
|
msg: bytes = struct.pack("<I", len(value)) + value
|
||||||
return await self._sender.send_all(size + value)
|
if self.batch_size == 1:
|
||||||
|
await self._sender.send_all(msg)
|
||||||
|
|
||||||
|
self._batch += msg
|
||||||
|
self._batch_msg_len += 1
|
||||||
|
if self._batch_msg_len == self.batch_size:
|
||||||
|
await self.flush()
|
||||||
|
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
await self._sender.aclose()
|
await self._sender.aclose()
|
||||||
|
@ -559,7 +587,8 @@ async def attach_to_ringbuf_rchannel(
|
||||||
@acm
|
@acm
|
||||||
async def attach_to_ringbuf_schannel(
|
async def attach_to_ringbuf_schannel(
|
||||||
token: RBToken,
|
token: RBToken,
|
||||||
cleanup: bool = True
|
cleanup: bool = True,
|
||||||
|
batch_size: int = 1,
|
||||||
) -> AsyncContextManager[RingBuffBytesSender]:
|
) -> AsyncContextManager[RingBuffBytesSender]:
|
||||||
'''
|
'''
|
||||||
Attach a RingBuffBytesSender from a previously opened
|
Attach a RingBuffBytesSender from a previously opened
|
||||||
|
@ -568,7 +597,7 @@ async def attach_to_ringbuf_schannel(
|
||||||
async with attach_to_ringbuf_sender(
|
async with attach_to_ringbuf_sender(
|
||||||
token, cleanup=cleanup
|
token, cleanup=cleanup
|
||||||
) as sender:
|
) as sender:
|
||||||
yield RingBuffBytesSender(sender)
|
yield RingBuffBytesSender(sender, batch_size=batch_size)
|
||||||
|
|
||||||
|
|
||||||
class RingBuffChannel(trio.abc.Channel[bytes]):
|
class RingBuffChannel(trio.abc.Channel[bytes]):
|
||||||
|
|
Loading…
Reference in New Issue