From a85c26017ebd371b6a25b3dedf1f8fc565b761f9 Mon Sep 17 00:00:00 2001
From: Guillermo Rodriguez <guillermo@telos.net>
Date: Thu, 13 Mar 2025 21:10:23 -0300
Subject: [PATCH] Move linux specifics from tractor.ipc._shm into
 tractor.ipc._linux

---
 tests/test_ringbuf.py   |   2 +-
 tractor/ipc/__init__.py |  26 +++-
 tractor/ipc/_linux.py   | 324 +++++++++++++++++++++++++++++++++++++++
 tractor/ipc/_shm.py     | 326 ----------------------------------------
 4 files changed, 343 insertions(+), 335 deletions(-)
 create mode 100644 tractor/ipc/_linux.py

diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py
index e4011768..1c4e88f9 100644
--- a/tests/test_ringbuf.py
+++ b/tests/test_ringbuf.py
@@ -3,7 +3,7 @@ import time
 import trio
 import pytest
 import tractor
-from tractor.ipc._shm import (
+from tractor.ipc import (
     EFD_NONBLOCK,
     open_eventfd,
     RingBuffSender,
diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py
index 2a401bf6..1b548b65 100644
--- a/tractor/ipc/__init__.py
+++ b/tractor/ipc/__init__.py
@@ -1,11 +1,21 @@
+import platform
+
 from ._chan import (
-    _connect_chan,
-    MsgTransport,
-    Channel
+    _connect_chan as _connect_chan,
+    MsgTransport as MsgTransport,
+    Channel as Channel
 )
 
-__all__ = [
-    '_connect_chan',
-    'MsgTransport',
-    'Channel',
-]
+if platform.system() == 'Linux':
+    from ._linux import (
+        EFD_SEMAPHORE as EFD_SEMAPHORE,
+        EFD_CLOEXEC as EFD_CLOEXEC,
+        EFD_NONBLOCK as EFD_NONBLOCK,
+        open_eventfd as open_eventfd,
+        write_eventfd as write_eventfd,
+        read_eventfd as read_eventfd,
+        close_eventfd as close_eventfd,
+        EventFD as EventFD,
+        RingBuffSender as RingBuffSender,
+        RingBuffReceiver as RingBuffReceiver
+    )
diff --git a/tractor/ipc/_linux.py b/tractor/ipc/_linux.py
new file mode 100644
index 00000000..2a9eabc1
--- /dev/null
+++ b/tractor/ipc/_linux.py
@@ -0,0 +1,324 @@
+
+import os
+import errno
+
+from multiprocessing.shared_memory import SharedMemory
+
+import cffi
+import trio
+
+ffi = cffi.FFI()
+
+# Declare the C functions and types we plan to use.
+#    - eventfd: for creating the event file descriptor
+#    - write:   for writing to the file descriptor
+#    - read:    for reading from the file descriptor
+#    - close:   for closing the file descriptor
+ffi.cdef(
+    '''
+    int eventfd(unsigned int initval, int flags);
+
+    ssize_t write(int fd, const void *buf, size_t count);
+    ssize_t read(int fd, void *buf, size_t count);
+
+    int close(int fd);
+    '''
+)
+
+
+# Open the default dynamic library (essentially 'libc' in most cases)
+C = ffi.dlopen(None)
+
+# Constants from <sys/eventfd.h>, if needed.
+EFD_SEMAPHORE = 1
+EFD_CLOEXEC = 0o2000000
+EFD_NONBLOCK = 0o4000
+
+
+def open_eventfd(initval: int = 0, flags: int = 0) -> int:
+    '''
+    Open an eventfd with the given initial value and flags.
+    Returns the file descriptor on success, otherwise raises OSError.
+
+    '''
+    fd = C.eventfd(initval, flags)
+    if fd < 0:
+        raise OSError(errno.errorcode[ffi.errno], 'eventfd failed')
+    return fd
+
+def write_eventfd(fd: int, value: int) -> int:
+    '''
+    Write a 64-bit integer (uint64_t) to the eventfd's counter.
+
+    '''
+    # Create a uint64_t* in C, store `value`
+    data_ptr = ffi.new('uint64_t *', value)
+
+    # Call write(fd, data_ptr, 8)
+    # We expect to write exactly 8 bytes (sizeof(uint64_t))
+    ret = C.write(fd, data_ptr, 8)
+    if ret < 0:
+        raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed')
+    return ret
+
+def read_eventfd(fd: int) -> int:
+    '''
+    Read a 64-bit integer (uint64_t) from the eventfd, returning the value.
+    Reading resets the counter to 0 (unless using EFD_SEMAPHORE).
+
+    '''
+    # Allocate an 8-byte buffer in C for reading
+    buf = ffi.new('char[]', 8)
+
+    ret = C.read(fd, buf, 8)
+    if ret < 0:
+        raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed')
+    # Convert the 8 bytes we read into a Python integer
+    data_bytes = ffi.unpack(buf, 8)  # returns a Python bytes object of length 8
+    value = int.from_bytes(data_bytes, byteorder='little', signed=False)
+    return value
+
+def close_eventfd(fd: int) -> int:
+    '''
+    Close the eventfd.
+
+    '''
+    ret = C.close(fd)
+    if ret < 0:
+        raise OSError(errno.errorcode[ffi.errno], 'close failed')
+
+
+class EventFD:
+    '''
+    Use a previously opened eventfd(2), meant to be used in
+    sub-actors after root actor opens the eventfds then passes
+    them through pass_fds
+
+    '''
+
+    def __init__(
+        self,
+        fd: int,
+        omode: str
+    ):
+        self._fd: int = fd
+        self._omode: str = omode
+        self._fobj = None
+
+    @property
+    def fd(self) -> int | None:
+        return self._fd
+
+    def write(self, value: int) -> int:
+        return write_eventfd(self._fd, value)
+
+    async def read(self) -> int:
+        #TODO: how to handle signals?
+        return await trio.to_thread.run_sync(read_eventfd, self._fd)
+
+    def open(self):
+        self._fobj = os.fdopen(self._fd, self._omode)
+
+    def close(self):
+        if self._fobj:
+            self._fobj.close()
+
+    def __enter__(self):
+        self.open()
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.close()
+
+
+class RingBuffSender(trio.abc.SendStream):
+    '''
+    IPC Reliable Ring Buffer sender side implementation
+
+    `eventfd(2)` is used for wrap around sync, and also to signal
+    writes to the reader.
+
+    TODO: if blocked on wrap around event wait it will not respond
+    to signals, fix soon TM
+    '''
+
+    def __init__(
+        self,
+        shm_key: str,
+        write_eventfd: int,
+        wrap_eventfd: int,
+        start_ptr: int = 0,
+        buf_size: int = 10 * 1024,
+        clean_shm_on_exit: bool = True
+    ):
+        self._shm = SharedMemory(
+            name=shm_key,
+            size=buf_size,
+            create=True
+        )
+        self._write_event = EventFD(write_eventfd, 'w')
+        self._wrap_event = EventFD(wrap_eventfd, 'r')
+        self._ptr = start_ptr
+        self.clean_shm_on_exit = clean_shm_on_exit
+
+    @property
+    def key(self) -> str:
+        return self._shm.name
+
+    @property
+    def size(self) -> int:
+        return self._shm.size
+
+    @property
+    def ptr(self) -> int:
+        return self._ptr
+
+    @property
+    def write_fd(self) -> int:
+        return self._write_event.fd
+
+    @property
+    def wrap_fd(self) -> int:
+        return self._wrap_event.fd
+
+    async def send_all(self, data: bytes | bytearray | memoryview):
+        # while data is larger than the remaining buf
+        target_ptr = self.ptr + len(data)
+        while target_ptr > self.size:
+            # write all bytes that fit
+            remaining = self.size - self.ptr
+            self._shm.buf[self.ptr:] = data[:remaining]
+            # signal write and wait for reader wrap around
+            self._write_event.write(remaining)
+            await self._wrap_event.read()
+
+            # wrap around and trim already written bytes
+            self._ptr = 0
+            data = data[remaining:]
+            target_ptr = self._ptr + len(data)
+
+        # remaining data fits on buffer
+        self._shm.buf[self.ptr:target_ptr] = data
+        self._write_event.write(len(data))
+        self._ptr = target_ptr
+
+    async def wait_send_all_might_not_block(self):
+        raise NotImplementedError
+
+    async def aclose(self):
+        self._write_event.close()
+        self._wrap_event.close()
+        if self.clean_shm_on_exit:
+            self._shm.unlink()
+
+        else:
+            self._shm.close()
+
+    async def __aenter__(self):
+        self._write_event.open()
+        self._wrap_event.open()
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback):
+        await self.aclose()
+
+
+class RingBuffReceiver(trio.abc.ReceiveStream):
+    '''
+    IPC Reliable Ring Buffer receiver side implementation
+
+    `eventfd(2)` is used for wrap around sync, and also to signal
+    writes to the reader.
+
+    Unless eventfd(2) object is opened with EFD_NONBLOCK flag,
+    calls to `receive_some` will block the signal handling,
+    on the main thread, for now solution is using polling,
+    working on a way to unblock GIL during read(2) to allow
+    signal processing on the main thread.
+    '''
+
+    def __init__(
+        self,
+        shm_key: str,
+        write_eventfd: int,
+        wrap_eventfd: int,
+        start_ptr: int = 0,
+        buf_size: int = 10 * 1024,
+        flags: int = 0
+    ):
+        self._shm = SharedMemory(
+            name=shm_key,
+            size=buf_size,
+            create=False
+        )
+        self._write_event = EventFD(write_eventfd, 'w')
+        self._wrap_event = EventFD(wrap_eventfd, 'r')
+        self._ptr = start_ptr
+        self._flags = flags
+
+    @property
+    def key(self) -> str:
+        return self._shm.name
+
+    @property
+    def size(self) -> int:
+        return self._shm.size
+
+    @property
+    def ptr(self) -> int:
+        return self._ptr
+
+    @property
+    def write_fd(self) -> int:
+        return self._write_event.fd
+
+    @property
+    def wrap_fd(self) -> int:
+        return self._wrap_event.fd
+
+    async def receive_some(
+        self,
+        max_bytes: int | None = None,
+        nb_timeout: float = 0.1
+    ) -> memoryview:
+        # if non blocking eventfd enabled, do polling
+        # until next write, this allows signal handling
+        if self._flags | EFD_NONBLOCK:
+            delta = None
+            while delta is None:
+                try:
+                    delta = await self._write_event.read()
+
+                except OSError as e:
+                    if e.errno == 'EAGAIN':
+                        continue
+
+                    raise e
+
+        else:
+            delta = await self._write_event.read()
+
+        # fetch next segment and advance ptr
+        next_ptr = self._ptr + delta
+        segment = self._shm.buf[self._ptr:next_ptr]
+        self._ptr = next_ptr
+
+        if self.ptr == self.size:
+            # reached the end, signal wrap around
+            self._ptr = 0
+            self._wrap_event.write(1)
+
+        return segment
+
+    async def aclose(self):
+        self._write_event.close()
+        self._wrap_event.close()
+        self._shm.close()
+
+    async def __aenter__(self):
+        self._write_event.open()
+        self._wrap_event.open()
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback):
+        await self.aclose()
diff --git a/tractor/ipc/_shm.py b/tractor/ipc/_shm.py
index 752f81ff..0ee8bf23 100644
--- a/tractor/ipc/_shm.py
+++ b/tractor/ipc/_shm.py
@@ -25,7 +25,6 @@ considered optional within the context of this runtime-library.
 from __future__ import annotations
 from sys import byteorder
 import time
-import platform
 from typing import Optional
 from multiprocessing import shared_memory as shm
 from multiprocessing.shared_memory import (
@@ -832,328 +831,3 @@ def attach_shm_list(
         name=key,
         readonly=readonly,
     )
-
-
-if platform.system() == 'Linux':
-    import os
-    import errno
-    from contextlib import asynccontextmanager as acm
-
-    import cffi
-    import trio
-
-    ffi = cffi.FFI()
-
-    # Declare the C functions and types we plan to use.
-    #    - eventfd: for creating the event file descriptor
-    #    - write:   for writing to the file descriptor
-    #    - read:    for reading from the file descriptor
-    #    - close:   for closing the file descriptor
-    ffi.cdef(
-        '''
-        int eventfd(unsigned int initval, int flags);
-
-        ssize_t write(int fd, const void *buf, size_t count);
-        ssize_t read(int fd, void *buf, size_t count);
-
-        int close(int fd);
-        '''
-    )
-
-
-    # Open the default dynamic library (essentially 'libc' in most cases)
-    C = ffi.dlopen(None)
-
-    # Constants from <sys/eventfd.h>, if needed.
-    EFD_SEMAPHORE = 1
-    EFD_CLOEXEC = 0o2000000
-    EFD_NONBLOCK = 0o4000
-
-
-    def open_eventfd(initval: int = 0, flags: int = 0) -> int:
-        '''
-        Open an eventfd with the given initial value and flags.
-        Returns the file descriptor on success, otherwise raises OSError.
-
-        '''
-        fd = C.eventfd(initval, flags)
-        if fd < 0:
-            raise OSError(errno.errorcode[ffi.errno], 'eventfd failed')
-        return fd
-
-    def write_eventfd(fd: int, value: int) -> int:
-        '''
-        Write a 64-bit integer (uint64_t) to the eventfd's counter.
-
-        '''
-        # Create a uint64_t* in C, store `value`
-        data_ptr = ffi.new('uint64_t *', value)
-
-        # Call write(fd, data_ptr, 8)
-        # We expect to write exactly 8 bytes (sizeof(uint64_t))
-        ret = C.write(fd, data_ptr, 8)
-        if ret < 0:
-            raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed')
-        return ret
-
-    def read_eventfd(fd: int) -> int:
-        '''
-        Read a 64-bit integer (uint64_t) from the eventfd, returning the value.
-        Reading resets the counter to 0 (unless using EFD_SEMAPHORE).
-
-        '''
-        # Allocate an 8-byte buffer in C for reading
-        buf = ffi.new('char[]', 8)
-
-        ret = C.read(fd, buf, 8)
-        if ret < 0:
-            raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed')
-        # Convert the 8 bytes we read into a Python integer
-        data_bytes = ffi.unpack(buf, 8)  # returns a Python bytes object of length 8
-        value = int.from_bytes(data_bytes, byteorder='little', signed=False)
-        return value
-
-    def close_eventfd(fd: int) -> int:
-        '''
-        Close the eventfd.
-
-        '''
-        ret = C.close(fd)
-        if ret < 0:
-            raise OSError(errno.errorcode[ffi.errno], 'close failed')
-
-
-    class EventFD:
-        '''
-        Use a previously opened eventfd(2), meant to be used in
-        sub-actors after root actor opens the eventfds then passes
-        them through pass_fds
-
-        '''
-
-        def __init__(
-            self,
-            fd: int,
-            omode: str
-        ):
-            self._fd: int = fd
-            self._omode: str = omode
-            self._fobj = None
-
-        @property
-        def fd(self) -> int | None:
-            return self._fd
-
-        def write(self, value: int) -> int:
-            return write_eventfd(self._fd, value)
-
-        async def read(self) -> int:
-            #TODO: how to handle signals?
-            return await trio.to_thread.run_sync(read_eventfd, self._fd)
-
-        def open(self):
-            self._fobj = os.fdopen(self._fd, self._omode)
-
-        def close(self):
-            if self._fobj:
-                self._fobj.close()
-
-        def __enter__(self):
-            self.open()
-            return self
-
-        def __exit__(self, exc_type, exc_value, traceback):
-            self.close()
-
-
-    class RingBuffSender(trio.abc.SendStream):
-        '''
-        IPC Reliable Ring Buffer sender side implementation
-
-        `eventfd(2)` is used for wrap around sync, and also to signal
-        writes to the reader.
-
-        TODO: if blocked on wrap around event wait it will not respond
-        to signals, fix soon TM
-        '''
-
-        def __init__(
-            self,
-            shm_key: str,
-            write_eventfd: int,
-            wrap_eventfd: int,
-            start_ptr: int = 0,
-            buf_size: int = 10 * 1024,
-            clean_shm_on_exit: bool = True
-        ):
-            self._shm = SharedMemory(
-                name=shm_key,
-                size=buf_size,
-                create=True
-            )
-            self._write_event = EventFD(write_eventfd, 'w')
-            self._wrap_event = EventFD(wrap_eventfd, 'r')
-            self._ptr = start_ptr
-            self.clean_shm_on_exit = clean_shm_on_exit
-
-        @property
-        def key(self) -> str:
-            return self._shm.name
-
-        @property
-        def size(self) -> int:
-            return self._shm.size
-
-        @property
-        def ptr(self) -> int:
-            return self._ptr
-
-        @property
-        def write_fd(self) -> int:
-            return self._write_event.fd
-
-        @property
-        def wrap_fd(self) -> int:
-            return self._wrap_event.fd
-
-        async def send_all(self, data: bytes | bytearray | memoryview):
-            # while data is larger than the remaining buf
-            target_ptr = self.ptr + len(data)
-            while target_ptr > self.size:
-                # write all bytes that fit
-                remaining = self.size - self.ptr
-                self._shm.buf[self.ptr:] = data[:remaining]
-                # signal write and wait for reader wrap around
-                self._write_event.write(remaining)
-                await self._wrap_event.read()
-
-                # wrap around and trim already written bytes
-                self._ptr = 0
-                data = data[remaining:]
-                target_ptr = self._ptr + len(data)
-
-            # remaining data fits on buffer
-            self._shm.buf[self.ptr:target_ptr] = data
-            self._write_event.write(len(data))
-            self._ptr = target_ptr
-
-        async def wait_send_all_might_not_block(self):
-            raise NotImplementedError
-
-        async def aclose(self):
-            self._write_event.close()
-            self._wrap_event.close()
-            if self.clean_shm_on_exit:
-                self._shm.unlink()
-
-            else:
-                self._shm.close()
-
-        async def __aenter__(self):
-            self._write_event.open()
-            self._wrap_event.open()
-            return self
-
-        async def __aexit__(self, exc_type, exc_value, traceback):
-            await self.aclose()
-
-
-    class RingBuffReceiver(trio.abc.ReceiveStream):
-        '''
-        IPC Reliable Ring Buffer receiver side implementation
-
-        `eventfd(2)` is used for wrap around sync, and also to signal
-        writes to the reader.
-
-        Unless eventfd(2) object is opened with EFD_NONBLOCK flag,
-        calls to `receive_some` will block the signal handling,
-        on the main thread, for now solution is using polling,
-        working on a way to unblock GIL during read(2) to allow
-        signal processing on the main thread.
-        '''
-
-        def __init__(
-            self,
-            shm_key: str,
-            write_eventfd: int,
-            wrap_eventfd: int,
-            start_ptr: int = 0,
-            buf_size: int = 10 * 1024,
-            flags: int = 0
-        ):
-            self._shm = SharedMemory(
-                name=shm_key,
-                size=buf_size,
-                create=False
-            )
-            self._write_event = EventFD(write_eventfd, 'w')
-            self._wrap_event = EventFD(wrap_eventfd, 'r')
-            self._ptr = start_ptr
-            self._flags = flags
-
-        @property
-        def key(self) -> str:
-            return self._shm.name
-
-        @property
-        def size(self) -> int:
-            return self._shm.size
-
-        @property
-        def ptr(self) -> int:
-            return self._ptr
-
-        @property
-        def write_fd(self) -> int:
-            return self._write_event.fd
-
-        @property
-        def wrap_fd(self) -> int:
-            return self._wrap_event.fd
-
-        async def receive_some(
-            self,
-            max_bytes: int | None = None,
-            nb_timeout: float = 0.1
-        ) -> memoryview:
-            # if non blocking eventfd enabled, do polling
-            # until next write, this allows signal handling
-            if self._flags | EFD_NONBLOCK:
-                delta = None
-                while delta is None:
-                    try:
-                        delta = await self._write_event.read()
-
-                    except OSError as e:
-                        if e.errno == 'EAGAIN':
-                            continue
-
-                        raise e
-
-            else:
-                delta = await self._write_event.read()
-
-            # fetch next segment and advance ptr
-            next_ptr = self._ptr + delta
-            segment = self._shm.buf[self._ptr:next_ptr]
-            self._ptr = next_ptr
-
-            if self.ptr == self.size:
-                # reached the end, signal wrap around
-                self._ptr = 0
-                self._wrap_event.write(1)
-
-            return segment
-
-        async def aclose(self):
-            self._write_event.close()
-            self._wrap_event.close()
-            self._shm.close()
-
-        async def __aenter__(self):
-            self._write_event.open()
-            self._wrap_event.open()
-            return self
-
-        async def __aexit__(self, exc_type, exc_value, traceback):
-            await self.aclose()