diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index e4011768..1c4e88f9 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -3,7 +3,7 @@ import time import trio import pytest import tractor -from tractor.ipc._shm import ( +from tractor.ipc import ( EFD_NONBLOCK, open_eventfd, RingBuffSender, diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index 2a401bf6..1b548b65 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -1,11 +1,21 @@ +import platform + from ._chan import ( - _connect_chan, - MsgTransport, - Channel + _connect_chan as _connect_chan, + MsgTransport as MsgTransport, + Channel as Channel ) -__all__ = [ - '_connect_chan', - 'MsgTransport', - 'Channel', -] +if platform.system() == 'Linux': + from ._linux import ( + EFD_SEMAPHORE as EFD_SEMAPHORE, + EFD_CLOEXEC as EFD_CLOEXEC, + EFD_NONBLOCK as EFD_NONBLOCK, + open_eventfd as open_eventfd, + write_eventfd as write_eventfd, + read_eventfd as read_eventfd, + close_eventfd as close_eventfd, + EventFD as EventFD, + RingBuffSender as RingBuffSender, + RingBuffReceiver as RingBuffReceiver + ) diff --git a/tractor/ipc/_linux.py b/tractor/ipc/_linux.py new file mode 100644 index 00000000..2a9eabc1 --- /dev/null +++ b/tractor/ipc/_linux.py @@ -0,0 +1,324 @@ + +import os +import errno + +from multiprocessing.shared_memory import SharedMemory + +import cffi +import trio + +ffi = cffi.FFI() + +# Declare the C functions and types we plan to use. +# - eventfd: for creating the event file descriptor +# - write: for writing to the file descriptor +# - read: for reading from the file descriptor +# - close: for closing the file descriptor +ffi.cdef( + ''' + int eventfd(unsigned int initval, int flags); + + ssize_t write(int fd, const void *buf, size_t count); + ssize_t read(int fd, void *buf, size_t count); + + int close(int fd); + ''' +) + + +# Open the default dynamic library (essentially 'libc' in most cases) +C = ffi.dlopen(None) + +# Constants from , if needed. +EFD_SEMAPHORE = 1 +EFD_CLOEXEC = 0o2000000 +EFD_NONBLOCK = 0o4000 + + +def open_eventfd(initval: int = 0, flags: int = 0) -> int: + ''' + Open an eventfd with the given initial value and flags. + Returns the file descriptor on success, otherwise raises OSError. + + ''' + fd = C.eventfd(initval, flags) + if fd < 0: + raise OSError(errno.errorcode[ffi.errno], 'eventfd failed') + return fd + +def write_eventfd(fd: int, value: int) -> int: + ''' + Write a 64-bit integer (uint64_t) to the eventfd's counter. + + ''' + # Create a uint64_t* in C, store `value` + data_ptr = ffi.new('uint64_t *', value) + + # Call write(fd, data_ptr, 8) + # We expect to write exactly 8 bytes (sizeof(uint64_t)) + ret = C.write(fd, data_ptr, 8) + if ret < 0: + raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed') + return ret + +def read_eventfd(fd: int) -> int: + ''' + Read a 64-bit integer (uint64_t) from the eventfd, returning the value. + Reading resets the counter to 0 (unless using EFD_SEMAPHORE). + + ''' + # Allocate an 8-byte buffer in C for reading + buf = ffi.new('char[]', 8) + + ret = C.read(fd, buf, 8) + if ret < 0: + raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed') + # Convert the 8 bytes we read into a Python integer + data_bytes = ffi.unpack(buf, 8) # returns a Python bytes object of length 8 + value = int.from_bytes(data_bytes, byteorder='little', signed=False) + return value + +def close_eventfd(fd: int) -> int: + ''' + Close the eventfd. + + ''' + ret = C.close(fd) + if ret < 0: + raise OSError(errno.errorcode[ffi.errno], 'close failed') + + +class EventFD: + ''' + Use a previously opened eventfd(2), meant to be used in + sub-actors after root actor opens the eventfds then passes + them through pass_fds + + ''' + + def __init__( + self, + fd: int, + omode: str + ): + self._fd: int = fd + self._omode: str = omode + self._fobj = None + + @property + def fd(self) -> int | None: + return self._fd + + def write(self, value: int) -> int: + return write_eventfd(self._fd, value) + + async def read(self) -> int: + #TODO: how to handle signals? + return await trio.to_thread.run_sync(read_eventfd, self._fd) + + def open(self): + self._fobj = os.fdopen(self._fd, self._omode) + + def close(self): + if self._fobj: + self._fobj.close() + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + +class RingBuffSender(trio.abc.SendStream): + ''' + IPC Reliable Ring Buffer sender side implementation + + `eventfd(2)` is used for wrap around sync, and also to signal + writes to the reader. + + TODO: if blocked on wrap around event wait it will not respond + to signals, fix soon TM + ''' + + def __init__( + self, + shm_key: str, + write_eventfd: int, + wrap_eventfd: int, + start_ptr: int = 0, + buf_size: int = 10 * 1024, + clean_shm_on_exit: bool = True + ): + self._shm = SharedMemory( + name=shm_key, + size=buf_size, + create=True + ) + self._write_event = EventFD(write_eventfd, 'w') + self._wrap_event = EventFD(wrap_eventfd, 'r') + self._ptr = start_ptr + self.clean_shm_on_exit = clean_shm_on_exit + + @property + def key(self) -> str: + return self._shm.name + + @property + def size(self) -> int: + return self._shm.size + + @property + def ptr(self) -> int: + return self._ptr + + @property + def write_fd(self) -> int: + return self._write_event.fd + + @property + def wrap_fd(self) -> int: + return self._wrap_event.fd + + async def send_all(self, data: bytes | bytearray | memoryview): + # while data is larger than the remaining buf + target_ptr = self.ptr + len(data) + while target_ptr > self.size: + # write all bytes that fit + remaining = self.size - self.ptr + self._shm.buf[self.ptr:] = data[:remaining] + # signal write and wait for reader wrap around + self._write_event.write(remaining) + await self._wrap_event.read() + + # wrap around and trim already written bytes + self._ptr = 0 + data = data[remaining:] + target_ptr = self._ptr + len(data) + + # remaining data fits on buffer + self._shm.buf[self.ptr:target_ptr] = data + self._write_event.write(len(data)) + self._ptr = target_ptr + + async def wait_send_all_might_not_block(self): + raise NotImplementedError + + async def aclose(self): + self._write_event.close() + self._wrap_event.close() + if self.clean_shm_on_exit: + self._shm.unlink() + + else: + self._shm.close() + + async def __aenter__(self): + self._write_event.open() + self._wrap_event.open() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.aclose() + + +class RingBuffReceiver(trio.abc.ReceiveStream): + ''' + IPC Reliable Ring Buffer receiver side implementation + + `eventfd(2)` is used for wrap around sync, and also to signal + writes to the reader. + + Unless eventfd(2) object is opened with EFD_NONBLOCK flag, + calls to `receive_some` will block the signal handling, + on the main thread, for now solution is using polling, + working on a way to unblock GIL during read(2) to allow + signal processing on the main thread. + ''' + + def __init__( + self, + shm_key: str, + write_eventfd: int, + wrap_eventfd: int, + start_ptr: int = 0, + buf_size: int = 10 * 1024, + flags: int = 0 + ): + self._shm = SharedMemory( + name=shm_key, + size=buf_size, + create=False + ) + self._write_event = EventFD(write_eventfd, 'w') + self._wrap_event = EventFD(wrap_eventfd, 'r') + self._ptr = start_ptr + self._flags = flags + + @property + def key(self) -> str: + return self._shm.name + + @property + def size(self) -> int: + return self._shm.size + + @property + def ptr(self) -> int: + return self._ptr + + @property + def write_fd(self) -> int: + return self._write_event.fd + + @property + def wrap_fd(self) -> int: + return self._wrap_event.fd + + async def receive_some( + self, + max_bytes: int | None = None, + nb_timeout: float = 0.1 + ) -> memoryview: + # if non blocking eventfd enabled, do polling + # until next write, this allows signal handling + if self._flags | EFD_NONBLOCK: + delta = None + while delta is None: + try: + delta = await self._write_event.read() + + except OSError as e: + if e.errno == 'EAGAIN': + continue + + raise e + + else: + delta = await self._write_event.read() + + # fetch next segment and advance ptr + next_ptr = self._ptr + delta + segment = self._shm.buf[self._ptr:next_ptr] + self._ptr = next_ptr + + if self.ptr == self.size: + # reached the end, signal wrap around + self._ptr = 0 + self._wrap_event.write(1) + + return segment + + async def aclose(self): + self._write_event.close() + self._wrap_event.close() + self._shm.close() + + async def __aenter__(self): + self._write_event.open() + self._wrap_event.open() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.aclose() diff --git a/tractor/ipc/_shm.py b/tractor/ipc/_shm.py index 36995117..202321fc 100644 --- a/tractor/ipc/_shm.py +++ b/tractor/ipc/_shm.py @@ -25,7 +25,6 @@ considered optional within the context of this runtime-library. from __future__ import annotations from sys import byteorder import time -import platform from typing import Optional from multiprocessing import shared_memory as shm from multiprocessing.shared_memory import ( @@ -832,328 +831,3 @@ def attach_shm_list( name=key, readonly=readonly, ) - - -if platform.system() == 'Linux': - import os - import errno - from contextlib import asynccontextmanager as acm - - import cffi - import trio - - ffi = cffi.FFI() - - # Declare the C functions and types we plan to use. - # - eventfd: for creating the event file descriptor - # - write: for writing to the file descriptor - # - read: for reading from the file descriptor - # - close: for closing the file descriptor - ffi.cdef( - ''' - int eventfd(unsigned int initval, int flags); - - ssize_t write(int fd, const void *buf, size_t count); - ssize_t read(int fd, void *buf, size_t count); - - int close(int fd); - ''' - ) - - - # Open the default dynamic library (essentially 'libc' in most cases) - C = ffi.dlopen(None) - - # Constants from , if needed. - EFD_SEMAPHORE = 1 - EFD_CLOEXEC = 0o2000000 - EFD_NONBLOCK = 0o4000 - - - def open_eventfd(initval: int = 0, flags: int = 0) -> int: - ''' - Open an eventfd with the given initial value and flags. - Returns the file descriptor on success, otherwise raises OSError. - - ''' - fd = C.eventfd(initval, flags) - if fd < 0: - raise OSError(errno.errorcode[ffi.errno], 'eventfd failed') - return fd - - def write_eventfd(fd: int, value: int) -> int: - ''' - Write a 64-bit integer (uint64_t) to the eventfd's counter. - - ''' - # Create a uint64_t* in C, store `value` - data_ptr = ffi.new('uint64_t *', value) - - # Call write(fd, data_ptr, 8) - # We expect to write exactly 8 bytes (sizeof(uint64_t)) - ret = C.write(fd, data_ptr, 8) - if ret < 0: - raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed') - return ret - - def read_eventfd(fd: int) -> int: - ''' - Read a 64-bit integer (uint64_t) from the eventfd, returning the value. - Reading resets the counter to 0 (unless using EFD_SEMAPHORE). - - ''' - # Allocate an 8-byte buffer in C for reading - buf = ffi.new('char[]', 8) - - ret = C.read(fd, buf, 8) - if ret < 0: - raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed') - # Convert the 8 bytes we read into a Python integer - data_bytes = ffi.unpack(buf, 8) # returns a Python bytes object of length 8 - value = int.from_bytes(data_bytes, byteorder='little', signed=False) - return value - - def close_eventfd(fd: int) -> int: - ''' - Close the eventfd. - - ''' - ret = C.close(fd) - if ret < 0: - raise OSError(errno.errorcode[ffi.errno], 'close failed') - - - class EventFD: - ''' - Use a previously opened eventfd(2), meant to be used in - sub-actors after root actor opens the eventfds then passes - them through pass_fds - - ''' - - def __init__( - self, - fd: int, - omode: str - ): - self._fd: int = fd - self._omode: str = omode - self._fobj = None - - @property - def fd(self) -> int | None: - return self._fd - - def write(self, value: int) -> int: - return write_eventfd(self._fd, value) - - async def read(self) -> int: - #TODO: how to handle signals? - return await trio.to_thread.run_sync(read_eventfd, self._fd) - - def open(self): - self._fobj = os.fdopen(self._fd, self._omode) - - def close(self): - if self._fobj: - self._fobj.close() - - def __enter__(self): - self.open() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - class RingBuffSender(trio.abc.SendStream): - ''' - IPC Reliable Ring Buffer sender side implementation - - `eventfd(2)` is used for wrap around sync, and also to signal - writes to the reader. - - TODO: if blocked on wrap around event wait it will not respond - to signals, fix soon TM - ''' - - def __init__( - self, - shm_key: str, - write_eventfd: int, - wrap_eventfd: int, - start_ptr: int = 0, - buf_size: int = 10 * 1024, - clean_shm_on_exit: bool = True - ): - self._shm = SharedMemory( - name=shm_key, - size=buf_size, - create=True - ) - self._write_event = EventFD(write_eventfd, 'w') - self._wrap_event = EventFD(wrap_eventfd, 'r') - self._ptr = start_ptr - self.clean_shm_on_exit = clean_shm_on_exit - - @property - def key(self) -> str: - return self._shm.name - - @property - def size(self) -> int: - return self._shm.size - - @property - def ptr(self) -> int: - return self._ptr - - @property - def write_fd(self) -> int: - return self._write_event.fd - - @property - def wrap_fd(self) -> int: - return self._wrap_event.fd - - async def send_all(self, data: bytes | bytearray | memoryview): - # while data is larger than the remaining buf - target_ptr = self.ptr + len(data) - while target_ptr > self.size: - # write all bytes that fit - remaining = self.size - self.ptr - self._shm.buf[self.ptr:] = data[:remaining] - # signal write and wait for reader wrap around - self._write_event.write(remaining) - await self._wrap_event.read() - - # wrap around and trim already written bytes - self._ptr = 0 - data = data[remaining:] - target_ptr = self._ptr + len(data) - - # remaining data fits on buffer - self._shm.buf[self.ptr:target_ptr] = data - self._write_event.write(len(data)) - self._ptr = target_ptr - - async def wait_send_all_might_not_block(self): - raise NotImplementedError - - async def aclose(self): - self._write_event.close() - self._wrap_event.close() - if self.clean_shm_on_exit: - self._shm.unlink() - - else: - self._shm.close() - - async def __aenter__(self): - self._write_event.open() - self._wrap_event.open() - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.aclose() - - - class RingBuffReceiver(trio.abc.ReceiveStream): - ''' - IPC Reliable Ring Buffer receiver side implementation - - `eventfd(2)` is used for wrap around sync, and also to signal - writes to the reader. - - Unless eventfd(2) object is opened with EFD_NONBLOCK flag, - calls to `receive_some` will block the signal handling, - on the main thread, for now solution is using polling, - working on a way to unblock GIL during read(2) to allow - signal processing on the main thread. - ''' - - def __init__( - self, - shm_key: str, - write_eventfd: int, - wrap_eventfd: int, - start_ptr: int = 0, - buf_size: int = 10 * 1024, - flags: int = 0 - ): - self._shm = SharedMemory( - name=shm_key, - size=buf_size, - create=False - ) - self._write_event = EventFD(write_eventfd, 'w') - self._wrap_event = EventFD(wrap_eventfd, 'r') - self._ptr = start_ptr - self._flags = flags - - @property - def key(self) -> str: - return self._shm.name - - @property - def size(self) -> int: - return self._shm.size - - @property - def ptr(self) -> int: - return self._ptr - - @property - def write_fd(self) -> int: - return self._write_event.fd - - @property - def wrap_fd(self) -> int: - return self._wrap_event.fd - - async def receive_some( - self, - max_bytes: int | None = None, - nb_timeout: float = 0.1 - ) -> memoryview: - # if non blocking eventfd enabled, do polling - # until next write, this allows signal handling - if self._flags | EFD_NONBLOCK: - delta = None - while delta is None: - try: - delta = await self._write_event.read() - - except OSError as e: - if e.errno == 'EAGAIN': - continue - - raise e - - else: - delta = await self._write_event.read() - - # fetch next segment and advance ptr - next_ptr = self._ptr + delta - segment = self._shm.buf[self._ptr:next_ptr] - self._ptr = next_ptr - - if self.ptr == self.size: - # reached the end, signal wrap around - self._ptr = 0 - self._wrap_event.write(1) - - return segment - - async def aclose(self): - self._write_event.close() - self._wrap_event.close() - self._shm.close() - - async def __aenter__(self): - self._write_event.open() - self._wrap_event.open() - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.aclose()