From 1bbf1f7ab574514586cbd25ae902eeb708f574c5 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 18 Mar 2025 22:48:12 -0300 Subject: [PATCH] Add direct read method on EventFD Type hint all ctx managers in _ringbuf.py Remove unnecesary send lock on ring chan sender Handle EOF on ring chan receiver Rename ringbuf tests to make it less redundant --- tests/test_ringbuf.py | 8 ++--- tractor/ipc/_linux.py | 8 +++++ tractor/ipc/_ringbuf.py | 75 ++++++++++++++++++----------------------- 3 files changed, 45 insertions(+), 46 deletions(-) diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index f987d4c8..8858215e 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -179,7 +179,7 @@ async def child_blocked_receiver( await receiver.receive_some() -def test_ring_reader_cancel(): +def test_reader_cancel(): ''' Test that a receiver blocked on eventfd(2) read responds to cancellation. @@ -222,7 +222,7 @@ async def child_blocked_sender( await sender.send_all(b'this will wrap') -def test_ring_sender_cancel(): +def test_sender_cancel(): ''' Test that a sender blocked on eventfd(2) read responds to cancellation. @@ -255,7 +255,7 @@ def test_ring_sender_cancel(): trio.run(main) -def test_ringbuf_max_bytes(): +def test_receiver_max_bytes(): ''' Test that RingBuffReceiver.receive_some's max_bytes optional argument works correctly, send a msg of size 100, then @@ -384,7 +384,7 @@ async def child_channel_sender( await chan.send(msg) -def test_ringbuf_channel(): +def test_channel(): msg_amount_min = 100 msg_amount_max = 1000 diff --git a/tractor/ipc/_linux.py b/tractor/ipc/_linux.py index afce6bff..0c05260e 100644 --- a/tractor/ipc/_linux.py +++ b/tractor/ipc/_linux.py @@ -157,6 +157,14 @@ class EventFD: self._cscope = None + def read_direct(self) -> int: + ''' + Direct call to `read_eventfd(self.fd)`, unless `eventfd` was + opened with `EFD_NONBLOCK` its gonna block the thread. + + ''' + return read_eventfd(self._fd) + def open(self): self._fobj = os.fdopen(self._fd, self._omode) diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py index 42403937..09c955ac 100644 --- a/tractor/ipc/_ringbuf.py +++ b/tractor/ipc/_ringbuf.py @@ -19,6 +19,10 @@ IPC Reliable RingBuffer implementation ''' from __future__ import annotations import struct +from typing import ( + ContextManager, + AsyncContextManager +) from contextlib import ( contextmanager as cm, asynccontextmanager as acm @@ -26,7 +30,6 @@ from contextlib import ( from multiprocessing.shared_memory import SharedMemory import trio -from tricycle import BufferedReceiveStream from msgspec import ( Struct, to_builtins @@ -34,7 +37,6 @@ from msgspec import ( from ._linux import ( open_eventfd, - close_eventfd, EFDReadCancelled, EventFD ) @@ -94,7 +96,7 @@ class RBToken(Struct, frozen=True): def open_ringbuf( shm_name: str, buf_size: int = _DEFAULT_RB_SIZE, -) -> RBToken: +) -> ContextManager[RBToken]: ''' Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver` @@ -106,31 +108,19 @@ def open_ringbuf( create=True ) try: - token = RBToken( - shm_name=shm_name, - write_eventfd=open_eventfd(), - wrap_eventfd=open_eventfd(), - eof_eventfd=open_eventfd(), - buf_size=buf_size - ) - yield token - try: - close_eventfd(token.write_eventfd) - - except OSError: - ... - - try: - close_eventfd(token.wrap_eventfd) - - except OSError: - ... - - try: - close_eventfd(token.eof_eventfd) - - except OSError: - ... + with ( + EventFD(open_eventfd(), 'r') as write_event, + EventFD(open_eventfd(), 'r') as wrap_event, + EventFD(open_eventfd(), 'r') as eof_event, + ): + token = RBToken( + shm_name=shm_name, + write_eventfd=write_event.fd, + wrap_eventfd=wrap_event.fd, + eof_eventfd=eof_event.fd, + buf_size=buf_size + ) + yield token finally: shm.unlink() @@ -232,6 +222,7 @@ class RingBuffSender(trio.abc.SendStream): self._eof_event.write( self._ptr if self._ptr > 0 else self.size ) + if self._cleanup: self._write_event.close() self._wrap_event.close() @@ -239,7 +230,8 @@ class RingBuffSender(trio.abc.SendStream): self._shm.close() async def aclose(self): - self.close() + async with self._send_lock: + self.close() async def __aenter__(self): self.open() @@ -396,7 +388,7 @@ class RingBuffReceiver(trio.abc.ReceiveStream): async def attach_to_ringbuf_receiver( token: RBToken, cleanup: bool = True -): +) -> AsyncContextManager[RingBuffReceiver]: ''' Attach a RingBuffReceiver from a previously opened RBToken. @@ -418,7 +410,7 @@ async def attach_to_ringbuf_receiver( async def attach_to_ringbuf_sender( token: RBToken, cleanup: bool = True -): +) -> AsyncContextManager[RingBuffSender]: ''' Attach a RingBuffSender from a previously opened RBToken. @@ -435,7 +427,7 @@ async def attach_to_ringbuf_sender( def open_ringbuf_pair( name: str, buf_size: int = _DEFAULT_RB_SIZE -): +) -> ContextManager[tuple(RBToken, RBToken)]: ''' Handle resources for a ringbuf pair to be used for bidirectional messaging. @@ -461,7 +453,7 @@ async def attach_to_ringbuf_stream( token_out: RBToken, cleanup_in: bool = True, cleanup_out: bool = True -): +) -> AsyncContextManager[trio.StapledStream]: ''' Attach a trio.StapledStream from a previously opened ringbuf pair. @@ -494,16 +486,13 @@ class RingBuffBytesSender(trio.abc.SendChannel[bytes]): sender: RingBuffSender ): self._sender = sender - self._send_lock = trio.StrictFIFOLock() async def send(self, value: bytes) -> None: - async with self._send_lock: - size: bytes = struct.pack(" None: - async with self._send_lock: - await self._sender.aclose() + await self._sender.aclose() class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]): @@ -544,6 +533,8 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]): header: bytes = await self._receive_exactly(4) size: int size, = struct.unpack(" None: @@ -554,7 +545,7 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]): async def attach_to_ringbuf_rchannel( token: RBToken, cleanup: bool = True -): +) -> AsyncContextManager[RingBuffBytesReceiver]: ''' Attach a RingBuffBytesReceiver from a previously opened RBToken. @@ -569,7 +560,7 @@ async def attach_to_ringbuf_rchannel( async def attach_to_ringbuf_schannel( token: RBToken, cleanup: bool = True -): +) -> AsyncContextManager[RingBuffBytesSender]: ''' Attach a RingBuffBytesSender from a previously opened RBToken. @@ -611,7 +602,7 @@ async def attach_to_ringbuf_channel( token_out: RBToken, cleanup_in: bool = True, cleanup_out: bool = True -): +) -> AsyncContextManager[RingBuffChannel]: ''' Attach to an already opened ringbuf pair and return a `RingBuffChannel`.