From 1819c026d158f9e5da5dfb352b945383fbe1a5ee Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Thu, 13 Mar 2025 21:15:16 -0300 Subject: [PATCH] Move RingBuffSender|Receiver to its own tractor.ipc._ringbuf module --- tractor/ipc/__init__.py | 3 + tractor/ipc/_linux.py | 195 -------------------------------------- tractor/ipc/_ringbuf.py | 201 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 195 deletions(-) create mode 100644 tractor/ipc/_ringbuf.py diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index 1b548b65..c0b58951 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -16,6 +16,9 @@ if platform.system() == 'Linux': read_eventfd as read_eventfd, close_eventfd as close_eventfd, EventFD as EventFD, + ) + + from ._ringbuf import ( RingBuffSender as RingBuffSender, RingBuffReceiver as RingBuffReceiver ) diff --git a/tractor/ipc/_linux.py b/tractor/ipc/_linux.py index 2a9eabc1..53feac6a 100644 --- a/tractor/ipc/_linux.py +++ b/tractor/ipc/_linux.py @@ -2,8 +2,6 @@ import os import errno -from multiprocessing.shared_memory import SharedMemory - import cffi import trio @@ -129,196 +127,3 @@ class EventFD: def __exit__(self, exc_type, exc_value, traceback): self.close() - - -class RingBuffSender(trio.abc.SendStream): - ''' - IPC Reliable Ring Buffer sender side implementation - - `eventfd(2)` is used for wrap around sync, and also to signal - writes to the reader. - - TODO: if blocked on wrap around event wait it will not respond - to signals, fix soon TM - ''' - - def __init__( - self, - shm_key: str, - write_eventfd: int, - wrap_eventfd: int, - start_ptr: int = 0, - buf_size: int = 10 * 1024, - clean_shm_on_exit: bool = True - ): - self._shm = SharedMemory( - name=shm_key, - size=buf_size, - create=True - ) - self._write_event = EventFD(write_eventfd, 'w') - self._wrap_event = EventFD(wrap_eventfd, 'r') - self._ptr = start_ptr - self.clean_shm_on_exit = clean_shm_on_exit - - @property - def key(self) -> str: - return self._shm.name - - @property - def size(self) -> int: - return self._shm.size - - @property - def ptr(self) -> int: - return self._ptr - - @property - def write_fd(self) -> int: - return self._write_event.fd - - @property - def wrap_fd(self) -> int: - return self._wrap_event.fd - - async def send_all(self, data: bytes | bytearray | memoryview): - # while data is larger than the remaining buf - target_ptr = self.ptr + len(data) - while target_ptr > self.size: - # write all bytes that fit - remaining = self.size - self.ptr - self._shm.buf[self.ptr:] = data[:remaining] - # signal write and wait for reader wrap around - self._write_event.write(remaining) - await self._wrap_event.read() - - # wrap around and trim already written bytes - self._ptr = 0 - data = data[remaining:] - target_ptr = self._ptr + len(data) - - # remaining data fits on buffer - self._shm.buf[self.ptr:target_ptr] = data - self._write_event.write(len(data)) - self._ptr = target_ptr - - async def wait_send_all_might_not_block(self): - raise NotImplementedError - - async def aclose(self): - self._write_event.close() - self._wrap_event.close() - if self.clean_shm_on_exit: - self._shm.unlink() - - else: - self._shm.close() - - async def __aenter__(self): - self._write_event.open() - self._wrap_event.open() - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.aclose() - - -class RingBuffReceiver(trio.abc.ReceiveStream): - ''' - IPC Reliable Ring Buffer receiver side implementation - - `eventfd(2)` is used for wrap around sync, and also to signal - writes to the reader. - - Unless eventfd(2) object is opened with EFD_NONBLOCK flag, - calls to `receive_some` will block the signal handling, - on the main thread, for now solution is using polling, - working on a way to unblock GIL during read(2) to allow - signal processing on the main thread. - ''' - - def __init__( - self, - shm_key: str, - write_eventfd: int, - wrap_eventfd: int, - start_ptr: int = 0, - buf_size: int = 10 * 1024, - flags: int = 0 - ): - self._shm = SharedMemory( - name=shm_key, - size=buf_size, - create=False - ) - self._write_event = EventFD(write_eventfd, 'w') - self._wrap_event = EventFD(wrap_eventfd, 'r') - self._ptr = start_ptr - self._flags = flags - - @property - def key(self) -> str: - return self._shm.name - - @property - def size(self) -> int: - return self._shm.size - - @property - def ptr(self) -> int: - return self._ptr - - @property - def write_fd(self) -> int: - return self._write_event.fd - - @property - def wrap_fd(self) -> int: - return self._wrap_event.fd - - async def receive_some( - self, - max_bytes: int | None = None, - nb_timeout: float = 0.1 - ) -> memoryview: - # if non blocking eventfd enabled, do polling - # until next write, this allows signal handling - if self._flags | EFD_NONBLOCK: - delta = None - while delta is None: - try: - delta = await self._write_event.read() - - except OSError as e: - if e.errno == 'EAGAIN': - continue - - raise e - - else: - delta = await self._write_event.read() - - # fetch next segment and advance ptr - next_ptr = self._ptr + delta - segment = self._shm.buf[self._ptr:next_ptr] - self._ptr = next_ptr - - if self.ptr == self.size: - # reached the end, signal wrap around - self._ptr = 0 - self._wrap_event.write(1) - - return segment - - async def aclose(self): - self._write_event.close() - self._wrap_event.close() - self._shm.close() - - async def __aenter__(self): - self._write_event.open() - self._wrap_event.open() - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.aclose() diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py new file mode 100644 index 00000000..0895381f --- /dev/null +++ b/tractor/ipc/_ringbuf.py @@ -0,0 +1,201 @@ +from multiprocessing.shared_memory import SharedMemory + +import trio + +from ._linux import ( + EFD_NONBLOCK, + EventFD +) + + +class RingBuffSender(trio.abc.SendStream): + ''' + IPC Reliable Ring Buffer sender side implementation + + `eventfd(2)` is used for wrap around sync, and also to signal + writes to the reader. + + TODO: if blocked on wrap around event wait it will not respond + to signals, fix soon TM + ''' + + def __init__( + self, + shm_key: str, + write_eventfd: int, + wrap_eventfd: int, + start_ptr: int = 0, + buf_size: int = 10 * 1024, + clean_shm_on_exit: bool = True + ): + self._shm = SharedMemory( + name=shm_key, + size=buf_size, + create=True + ) + self._write_event = EventFD(write_eventfd, 'w') + self._wrap_event = EventFD(wrap_eventfd, 'r') + self._ptr = start_ptr + self.clean_shm_on_exit = clean_shm_on_exit + + @property + def key(self) -> str: + return self._shm.name + + @property + def size(self) -> int: + return self._shm.size + + @property + def ptr(self) -> int: + return self._ptr + + @property + def write_fd(self) -> int: + return self._write_event.fd + + @property + def wrap_fd(self) -> int: + return self._wrap_event.fd + + async def send_all(self, data: bytes | bytearray | memoryview): + # while data is larger than the remaining buf + target_ptr = self.ptr + len(data) + while target_ptr > self.size: + # write all bytes that fit + remaining = self.size - self.ptr + self._shm.buf[self.ptr:] = data[:remaining] + # signal write and wait for reader wrap around + self._write_event.write(remaining) + await self._wrap_event.read() + + # wrap around and trim already written bytes + self._ptr = 0 + data = data[remaining:] + target_ptr = self._ptr + len(data) + + # remaining data fits on buffer + self._shm.buf[self.ptr:target_ptr] = data + self._write_event.write(len(data)) + self._ptr = target_ptr + + async def wait_send_all_might_not_block(self): + raise NotImplementedError + + async def aclose(self): + self._write_event.close() + self._wrap_event.close() + if self.clean_shm_on_exit: + self._shm.unlink() + + else: + self._shm.close() + + async def __aenter__(self): + self._write_event.open() + self._wrap_event.open() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.aclose() + + +class RingBuffReceiver(trio.abc.ReceiveStream): + ''' + IPC Reliable Ring Buffer receiver side implementation + + `eventfd(2)` is used for wrap around sync, and also to signal + writes to the reader. + + Unless eventfd(2) object is opened with EFD_NONBLOCK flag, + calls to `receive_some` will block the signal handling, + on the main thread, for now solution is using polling, + working on a way to unblock GIL during read(2) to allow + signal processing on the main thread. + ''' + + def __init__( + self, + shm_key: str, + write_eventfd: int, + wrap_eventfd: int, + start_ptr: int = 0, + buf_size: int = 10 * 1024, + flags: int = 0 + ): + self._shm = SharedMemory( + name=shm_key, + size=buf_size, + create=False + ) + self._write_event = EventFD(write_eventfd, 'w') + self._wrap_event = EventFD(wrap_eventfd, 'r') + self._ptr = start_ptr + self._flags = flags + + @property + def key(self) -> str: + return self._shm.name + + @property + def size(self) -> int: + return self._shm.size + + @property + def ptr(self) -> int: + return self._ptr + + @property + def write_fd(self) -> int: + return self._write_event.fd + + @property + def wrap_fd(self) -> int: + return self._wrap_event.fd + + async def receive_some( + self, + max_bytes: int | None = None, + nb_timeout: float = 0.1 + ) -> memoryview: + # if non blocking eventfd enabled, do polling + # until next write, this allows signal handling + if self._flags | EFD_NONBLOCK: + delta = None + while delta is None: + try: + delta = await self._write_event.read() + + except OSError as e: + if e.errno == 'EAGAIN': + continue + + raise e + + else: + delta = await self._write_event.read() + + # fetch next segment and advance ptr + next_ptr = self._ptr + delta + segment = self._shm.buf[self._ptr:next_ptr] + self._ptr = next_ptr + + if self.ptr == self.size: + # reached the end, signal wrap around + self._ptr = 0 + self._wrap_event.write(1) + + return segment + + async def aclose(self): + self._write_event.close() + self._wrap_event.close() + self._shm.close() + + async def __aenter__(self): + self._write_event.open() + self._wrap_event.open() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.aclose()