Add direct read method on EventFD

Type hint all ctx managers in _ringbuf.py
Remove unnecesary send lock on ring chan sender
Handle EOF on ring chan receiver
Rename ringbuf tests to make it less redundant
Guillermo Rodriguez 2025-03-18 22:48:12 -03:00
parent 4523102869
commit 2d15b6bfb1
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
3 changed files with 45 additions and 46 deletions

View File

@ -179,7 +179,7 @@ async def child_blocked_receiver(
await receiver.receive_some()
def test_ring_reader_cancel():
def test_reader_cancel():
'''
Test that a receiver blocked on eventfd(2) read responds to
cancellation.
@ -222,7 +222,7 @@ async def child_blocked_sender(
await sender.send_all(b'this will wrap')
def test_ring_sender_cancel():
def test_sender_cancel():
'''
Test that a sender blocked on eventfd(2) read responds to
cancellation.
@ -255,7 +255,7 @@ def test_ring_sender_cancel():
trio.run(main)
def test_ringbuf_max_bytes():
def test_receiver_max_bytes():
'''
Test that RingBuffReceiver.receive_some's max_bytes optional
argument works correctly, send a msg of size 100, then
@ -384,7 +384,7 @@ async def child_channel_sender(
await chan.send(msg)
def test_ringbuf_channel():
def test_channel():
msg_amount_min = 100
msg_amount_max = 1000

View File

@ -157,6 +157,14 @@ class EventFD:
self._cscope = None
def read_direct(self) -> int:
'''
Direct call to `read_eventfd(self.fd)`, unless `eventfd` was
opened with `EFD_NONBLOCK` its gonna block the thread.
'''
return read_eventfd(self._fd)
def open(self):
self._fobj = os.fdopen(self._fd, self._omode)

View File

@ -19,6 +19,10 @@ IPC Reliable RingBuffer implementation
'''
from __future__ import annotations
import struct
from typing import (
ContextManager,
AsyncContextManager
)
from contextlib import (
contextmanager as cm,
asynccontextmanager as acm
@ -26,7 +30,6 @@ from contextlib import (
from multiprocessing.shared_memory import SharedMemory
import trio
from tricycle import BufferedReceiveStream
from msgspec import (
Struct,
to_builtins
@ -34,7 +37,6 @@ from msgspec import (
from ._linux import (
open_eventfd,
close_eventfd,
EFDReadCancelled,
EventFD
)
@ -94,7 +96,7 @@ class RBToken(Struct, frozen=True):
def open_ringbuf(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
) -> RBToken:
) -> ContextManager[RBToken]:
'''
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`
@ -106,31 +108,19 @@ def open_ringbuf(
create=True
)
try:
token = RBToken(
shm_name=shm_name,
write_eventfd=open_eventfd(),
wrap_eventfd=open_eventfd(),
eof_eventfd=open_eventfd(),
buf_size=buf_size
)
yield token
try:
close_eventfd(token.write_eventfd)
except OSError:
...
try:
close_eventfd(token.wrap_eventfd)
except OSError:
...
try:
close_eventfd(token.eof_eventfd)
except OSError:
...
with (
EventFD(open_eventfd(), 'r') as write_event,
EventFD(open_eventfd(), 'r') as wrap_event,
EventFD(open_eventfd(), 'r') as eof_event,
):
token = RBToken(
shm_name=shm_name,
write_eventfd=write_event.fd,
wrap_eventfd=wrap_event.fd,
eof_eventfd=eof_event.fd,
buf_size=buf_size
)
yield token
finally:
shm.unlink()
@ -232,6 +222,7 @@ class RingBuffSender(trio.abc.SendStream):
self._eof_event.write(
self._ptr if self._ptr > 0 else self.size
)
if self._cleanup:
self._write_event.close()
self._wrap_event.close()
@ -239,7 +230,8 @@ class RingBuffSender(trio.abc.SendStream):
self._shm.close()
async def aclose(self):
self.close()
async with self._send_lock:
self.close()
async def __aenter__(self):
self.open()
@ -396,7 +388,7 @@ class RingBuffReceiver(trio.abc.ReceiveStream):
async def attach_to_ringbuf_receiver(
token: RBToken,
cleanup: bool = True
):
) -> AsyncContextManager[RingBuffReceiver]:
'''
Attach a RingBuffReceiver from a previously opened
RBToken.
@ -418,7 +410,7 @@ async def attach_to_ringbuf_receiver(
async def attach_to_ringbuf_sender(
token: RBToken,
cleanup: bool = True
):
) -> AsyncContextManager[RingBuffSender]:
'''
Attach a RingBuffSender from a previously opened
RBToken.
@ -435,7 +427,7 @@ async def attach_to_ringbuf_sender(
def open_ringbuf_pair(
name: str,
buf_size: int = _DEFAULT_RB_SIZE
):
) -> ContextManager[tuple(RBToken, RBToken)]:
'''
Handle resources for a ringbuf pair to be used for
bidirectional messaging.
@ -461,7 +453,7 @@ async def attach_to_ringbuf_stream(
token_out: RBToken,
cleanup_in: bool = True,
cleanup_out: bool = True
):
) -> AsyncContextManager[trio.StapledStream]:
'''
Attach a trio.StapledStream from a previously opened
ringbuf pair.
@ -494,16 +486,13 @@ class RingBuffBytesSender(trio.abc.SendChannel[bytes]):
sender: RingBuffSender
):
self._sender = sender
self._send_lock = trio.StrictFIFOLock()
async def send(self, value: bytes) -> None:
async with self._send_lock:
size: bytes = struct.pack("<I", len(value))
return await self._sender.send_all(size + value)
size: bytes = struct.pack("<I", len(value))
return await self._sender.send_all(size + value)
async def aclose(self) -> None:
async with self._send_lock:
await self._sender.aclose()
await self._sender.aclose()
class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
@ -544,6 +533,8 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
header: bytes = await self._receive_exactly(4)
size: int
size, = struct.unpack("<I", header)
if size == 0:
raise trio.EndOfChannel
return await self._receive_exactly(size)
async def aclose(self) -> None:
@ -554,7 +545,7 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
async def attach_to_ringbuf_rchannel(
token: RBToken,
cleanup: bool = True
):
) -> AsyncContextManager[RingBuffBytesReceiver]:
'''
Attach a RingBuffBytesReceiver from a previously opened
RBToken.
@ -569,7 +560,7 @@ async def attach_to_ringbuf_rchannel(
async def attach_to_ringbuf_schannel(
token: RBToken,
cleanup: bool = True
):
) -> AsyncContextManager[RingBuffBytesSender]:
'''
Attach a RingBuffBytesSender from a previously opened
RBToken.
@ -611,7 +602,7 @@ async def attach_to_ringbuf_channel(
token_out: RBToken,
cleanup_in: bool = True,
cleanup_out: bool = True
):
) -> AsyncContextManager[RingBuffChannel]:
'''
Attach to an already opened ringbuf pair and return
a `RingBuffChannel`.