Move linux specifics from tractor.ipc._shm into tractor.ipc._linux
							parent
							
								
									8e294b8b7c
								
							
						
					
					
						commit
						ef9b30639f
					
				|  | @ -3,7 +3,7 @@ import time | |||
| import trio | ||||
| import pytest | ||||
| import tractor | ||||
| from tractor.ipc._shm import ( | ||||
| from tractor.ipc import ( | ||||
|     EFD_NONBLOCK, | ||||
|     open_eventfd, | ||||
|     RingBuffSender, | ||||
|  |  | |||
|  | @ -1,11 +1,21 @@ | |||
| import platform | ||||
| 
 | ||||
| from ._chan import ( | ||||
|     _connect_chan, | ||||
|     MsgTransport, | ||||
|     Channel | ||||
|     _connect_chan as _connect_chan, | ||||
|     MsgTransport as MsgTransport, | ||||
|     Channel as Channel | ||||
| ) | ||||
| 
 | ||||
| __all__ = [ | ||||
|     '_connect_chan', | ||||
|     'MsgTransport', | ||||
|     'Channel', | ||||
| ] | ||||
| if platform.system() == 'Linux': | ||||
|     from ._linux import ( | ||||
|         EFD_SEMAPHORE as EFD_SEMAPHORE, | ||||
|         EFD_CLOEXEC as EFD_CLOEXEC, | ||||
|         EFD_NONBLOCK as EFD_NONBLOCK, | ||||
|         open_eventfd as open_eventfd, | ||||
|         write_eventfd as write_eventfd, | ||||
|         read_eventfd as read_eventfd, | ||||
|         close_eventfd as close_eventfd, | ||||
|         EventFD as EventFD, | ||||
|         RingBuffSender as RingBuffSender, | ||||
|         RingBuffReceiver as RingBuffReceiver | ||||
|     ) | ||||
|  |  | |||
|  | @ -0,0 +1,324 @@ | |||
| 
 | ||||
| import os | ||||
| import errno | ||||
| 
 | ||||
| from multiprocessing.shared_memory import SharedMemory | ||||
| 
 | ||||
| import cffi | ||||
| import trio | ||||
| 
 | ||||
| ffi = cffi.FFI() | ||||
| 
 | ||||
| # Declare the C functions and types we plan to use. | ||||
| #    - eventfd: for creating the event file descriptor | ||||
| #    - write:   for writing to the file descriptor | ||||
| #    - read:    for reading from the file descriptor | ||||
| #    - close:   for closing the file descriptor | ||||
| ffi.cdef( | ||||
|     ''' | ||||
|     int eventfd(unsigned int initval, int flags); | ||||
| 
 | ||||
|     ssize_t write(int fd, const void *buf, size_t count); | ||||
|     ssize_t read(int fd, void *buf, size_t count); | ||||
| 
 | ||||
|     int close(int fd); | ||||
|     ''' | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # Open the default dynamic library (essentially 'libc' in most cases) | ||||
| C = ffi.dlopen(None) | ||||
| 
 | ||||
| # Constants from <sys/eventfd.h>, if needed. | ||||
| EFD_SEMAPHORE = 1 | ||||
| EFD_CLOEXEC = 0o2000000 | ||||
| EFD_NONBLOCK = 0o4000 | ||||
| 
 | ||||
| 
 | ||||
| def open_eventfd(initval: int = 0, flags: int = 0) -> int: | ||||
|     ''' | ||||
|     Open an eventfd with the given initial value and flags. | ||||
|     Returns the file descriptor on success, otherwise raises OSError. | ||||
| 
 | ||||
|     ''' | ||||
|     fd = C.eventfd(initval, flags) | ||||
|     if fd < 0: | ||||
|         raise OSError(errno.errorcode[ffi.errno], 'eventfd failed') | ||||
|     return fd | ||||
| 
 | ||||
| def write_eventfd(fd: int, value: int) -> int: | ||||
|     ''' | ||||
|     Write a 64-bit integer (uint64_t) to the eventfd's counter. | ||||
| 
 | ||||
|     ''' | ||||
|     # Create a uint64_t* in C, store `value` | ||||
|     data_ptr = ffi.new('uint64_t *', value) | ||||
| 
 | ||||
|     # Call write(fd, data_ptr, 8) | ||||
|     # We expect to write exactly 8 bytes (sizeof(uint64_t)) | ||||
|     ret = C.write(fd, data_ptr, 8) | ||||
|     if ret < 0: | ||||
|         raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed') | ||||
|     return ret | ||||
| 
 | ||||
| def read_eventfd(fd: int) -> int: | ||||
|     ''' | ||||
|     Read a 64-bit integer (uint64_t) from the eventfd, returning the value. | ||||
|     Reading resets the counter to 0 (unless using EFD_SEMAPHORE). | ||||
| 
 | ||||
|     ''' | ||||
|     # Allocate an 8-byte buffer in C for reading | ||||
|     buf = ffi.new('char[]', 8) | ||||
| 
 | ||||
|     ret = C.read(fd, buf, 8) | ||||
|     if ret < 0: | ||||
|         raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed') | ||||
|     # Convert the 8 bytes we read into a Python integer | ||||
|     data_bytes = ffi.unpack(buf, 8)  # returns a Python bytes object of length 8 | ||||
|     value = int.from_bytes(data_bytes, byteorder='little', signed=False) | ||||
|     return value | ||||
| 
 | ||||
| def close_eventfd(fd: int) -> int: | ||||
|     ''' | ||||
|     Close the eventfd. | ||||
| 
 | ||||
|     ''' | ||||
|     ret = C.close(fd) | ||||
|     if ret < 0: | ||||
|         raise OSError(errno.errorcode[ffi.errno], 'close failed') | ||||
| 
 | ||||
| 
 | ||||
| class EventFD: | ||||
|     ''' | ||||
|     Use a previously opened eventfd(2), meant to be used in | ||||
|     sub-actors after root actor opens the eventfds then passes | ||||
|     them through pass_fds | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         fd: int, | ||||
|         omode: str | ||||
|     ): | ||||
|         self._fd: int = fd | ||||
|         self._omode: str = omode | ||||
|         self._fobj = None | ||||
| 
 | ||||
|     @property | ||||
|     def fd(self) -> int | None: | ||||
|         return self._fd | ||||
| 
 | ||||
|     def write(self, value: int) -> int: | ||||
|         return write_eventfd(self._fd, value) | ||||
| 
 | ||||
|     async def read(self) -> int: | ||||
|         #TODO: how to handle signals? | ||||
|         return await trio.to_thread.run_sync(read_eventfd, self._fd) | ||||
| 
 | ||||
|     def open(self): | ||||
|         self._fobj = os.fdopen(self._fd, self._omode) | ||||
| 
 | ||||
|     def close(self): | ||||
|         if self._fobj: | ||||
|             self._fobj.close() | ||||
| 
 | ||||
|     def __enter__(self): | ||||
|         self.open() | ||||
|         return self | ||||
| 
 | ||||
|     def __exit__(self, exc_type, exc_value, traceback): | ||||
|         self.close() | ||||
| 
 | ||||
| 
 | ||||
| class RingBuffSender(trio.abc.SendStream): | ||||
|     ''' | ||||
|     IPC Reliable Ring Buffer sender side implementation | ||||
| 
 | ||||
|     `eventfd(2)` is used for wrap around sync, and also to signal | ||||
|     writes to the reader. | ||||
| 
 | ||||
|     TODO: if blocked on wrap around event wait it will not respond | ||||
|     to signals, fix soon TM | ||||
|     ''' | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         shm_key: str, | ||||
|         write_eventfd: int, | ||||
|         wrap_eventfd: int, | ||||
|         start_ptr: int = 0, | ||||
|         buf_size: int = 10 * 1024, | ||||
|         clean_shm_on_exit: bool = True | ||||
|     ): | ||||
|         self._shm = SharedMemory( | ||||
|             name=shm_key, | ||||
|             size=buf_size, | ||||
|             create=True | ||||
|         ) | ||||
|         self._write_event = EventFD(write_eventfd, 'w') | ||||
|         self._wrap_event = EventFD(wrap_eventfd, 'r') | ||||
|         self._ptr = start_ptr | ||||
|         self.clean_shm_on_exit = clean_shm_on_exit | ||||
| 
 | ||||
|     @property | ||||
|     def key(self) -> str: | ||||
|         return self._shm.name | ||||
| 
 | ||||
|     @property | ||||
|     def size(self) -> int: | ||||
|         return self._shm.size | ||||
| 
 | ||||
|     @property | ||||
|     def ptr(self) -> int: | ||||
|         return self._ptr | ||||
| 
 | ||||
|     @property | ||||
|     def write_fd(self) -> int: | ||||
|         return self._write_event.fd | ||||
| 
 | ||||
|     @property | ||||
|     def wrap_fd(self) -> int: | ||||
|         return self._wrap_event.fd | ||||
| 
 | ||||
|     async def send_all(self, data: bytes | bytearray | memoryview): | ||||
|         # while data is larger than the remaining buf | ||||
|         target_ptr = self.ptr + len(data) | ||||
|         while target_ptr > self.size: | ||||
|             # write all bytes that fit | ||||
|             remaining = self.size - self.ptr | ||||
|             self._shm.buf[self.ptr:] = data[:remaining] | ||||
|             # signal write and wait for reader wrap around | ||||
|             self._write_event.write(remaining) | ||||
|             await self._wrap_event.read() | ||||
| 
 | ||||
|             # wrap around and trim already written bytes | ||||
|             self._ptr = 0 | ||||
|             data = data[remaining:] | ||||
|             target_ptr = self._ptr + len(data) | ||||
| 
 | ||||
|         # remaining data fits on buffer | ||||
|         self._shm.buf[self.ptr:target_ptr] = data | ||||
|         self._write_event.write(len(data)) | ||||
|         self._ptr = target_ptr | ||||
| 
 | ||||
|     async def wait_send_all_might_not_block(self): | ||||
|         raise NotImplementedError | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         self._write_event.close() | ||||
|         self._wrap_event.close() | ||||
|         if self.clean_shm_on_exit: | ||||
|             self._shm.unlink() | ||||
| 
 | ||||
|         else: | ||||
|             self._shm.close() | ||||
| 
 | ||||
|     async def __aenter__(self): | ||||
|         self._write_event.open() | ||||
|         self._wrap_event.open() | ||||
|         return self | ||||
| 
 | ||||
|     async def __aexit__(self, exc_type, exc_value, traceback): | ||||
|         await self.aclose() | ||||
| 
 | ||||
| 
 | ||||
| class RingBuffReceiver(trio.abc.ReceiveStream): | ||||
|     ''' | ||||
|     IPC Reliable Ring Buffer receiver side implementation | ||||
| 
 | ||||
|     `eventfd(2)` is used for wrap around sync, and also to signal | ||||
|     writes to the reader. | ||||
| 
 | ||||
|     Unless eventfd(2) object is opened with EFD_NONBLOCK flag, | ||||
|     calls to `receive_some` will block the signal handling, | ||||
|     on the main thread, for now solution is using polling, | ||||
|     working on a way to unblock GIL during read(2) to allow | ||||
|     signal processing on the main thread. | ||||
|     ''' | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         shm_key: str, | ||||
|         write_eventfd: int, | ||||
|         wrap_eventfd: int, | ||||
|         start_ptr: int = 0, | ||||
|         buf_size: int = 10 * 1024, | ||||
|         flags: int = 0 | ||||
|     ): | ||||
|         self._shm = SharedMemory( | ||||
|             name=shm_key, | ||||
|             size=buf_size, | ||||
|             create=False | ||||
|         ) | ||||
|         self._write_event = EventFD(write_eventfd, 'w') | ||||
|         self._wrap_event = EventFD(wrap_eventfd, 'r') | ||||
|         self._ptr = start_ptr | ||||
|         self._flags = flags | ||||
| 
 | ||||
|     @property | ||||
|     def key(self) -> str: | ||||
|         return self._shm.name | ||||
| 
 | ||||
|     @property | ||||
|     def size(self) -> int: | ||||
|         return self._shm.size | ||||
| 
 | ||||
|     @property | ||||
|     def ptr(self) -> int: | ||||
|         return self._ptr | ||||
| 
 | ||||
|     @property | ||||
|     def write_fd(self) -> int: | ||||
|         return self._write_event.fd | ||||
| 
 | ||||
|     @property | ||||
|     def wrap_fd(self) -> int: | ||||
|         return self._wrap_event.fd | ||||
| 
 | ||||
|     async def receive_some( | ||||
|         self, | ||||
|         max_bytes: int | None = None, | ||||
|         nb_timeout: float = 0.1 | ||||
|     ) -> memoryview: | ||||
|         # if non blocking eventfd enabled, do polling | ||||
|         # until next write, this allows signal handling | ||||
|         if self._flags | EFD_NONBLOCK: | ||||
|             delta = None | ||||
|             while delta is None: | ||||
|                 try: | ||||
|                     delta = await self._write_event.read() | ||||
| 
 | ||||
|                 except OSError as e: | ||||
|                     if e.errno == 'EAGAIN': | ||||
|                         continue | ||||
| 
 | ||||
|                     raise e | ||||
| 
 | ||||
|         else: | ||||
|             delta = await self._write_event.read() | ||||
| 
 | ||||
|         # fetch next segment and advance ptr | ||||
|         next_ptr = self._ptr + delta | ||||
|         segment = self._shm.buf[self._ptr:next_ptr] | ||||
|         self._ptr = next_ptr | ||||
| 
 | ||||
|         if self.ptr == self.size: | ||||
|             # reached the end, signal wrap around | ||||
|             self._ptr = 0 | ||||
|             self._wrap_event.write(1) | ||||
| 
 | ||||
|         return segment | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         self._write_event.close() | ||||
|         self._wrap_event.close() | ||||
|         self._shm.close() | ||||
| 
 | ||||
|     async def __aenter__(self): | ||||
|         self._write_event.open() | ||||
|         self._wrap_event.open() | ||||
|         return self | ||||
| 
 | ||||
|     async def __aexit__(self, exc_type, exc_value, traceback): | ||||
|         await self.aclose() | ||||
|  | @ -25,7 +25,6 @@ considered optional within the context of this runtime-library. | |||
| from __future__ import annotations | ||||
| from sys import byteorder | ||||
| import time | ||||
| import platform | ||||
| from typing import Optional | ||||
| from multiprocessing import shared_memory as shm | ||||
| from multiprocessing.shared_memory import ( | ||||
|  | @ -832,328 +831,3 @@ def attach_shm_list( | |||
|         name=key, | ||||
|         readonly=readonly, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| if platform.system() == 'Linux': | ||||
|     import os | ||||
|     import errno | ||||
|     from contextlib import asynccontextmanager as acm | ||||
| 
 | ||||
|     import cffi | ||||
|     import trio | ||||
| 
 | ||||
|     ffi = cffi.FFI() | ||||
| 
 | ||||
|     # Declare the C functions and types we plan to use. | ||||
|     #    - eventfd: for creating the event file descriptor | ||||
|     #    - write:   for writing to the file descriptor | ||||
|     #    - read:    for reading from the file descriptor | ||||
|     #    - close:   for closing the file descriptor | ||||
|     ffi.cdef( | ||||
|         ''' | ||||
|         int eventfd(unsigned int initval, int flags); | ||||
| 
 | ||||
|         ssize_t write(int fd, const void *buf, size_t count); | ||||
|         ssize_t read(int fd, void *buf, size_t count); | ||||
| 
 | ||||
|         int close(int fd); | ||||
|         ''' | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
|     # Open the default dynamic library (essentially 'libc' in most cases) | ||||
|     C = ffi.dlopen(None) | ||||
| 
 | ||||
|     # Constants from <sys/eventfd.h>, if needed. | ||||
|     EFD_SEMAPHORE = 1 | ||||
|     EFD_CLOEXEC = 0o2000000 | ||||
|     EFD_NONBLOCK = 0o4000 | ||||
| 
 | ||||
| 
 | ||||
|     def open_eventfd(initval: int = 0, flags: int = 0) -> int: | ||||
|         ''' | ||||
|         Open an eventfd with the given initial value and flags. | ||||
|         Returns the file descriptor on success, otherwise raises OSError. | ||||
| 
 | ||||
|         ''' | ||||
|         fd = C.eventfd(initval, flags) | ||||
|         if fd < 0: | ||||
|             raise OSError(errno.errorcode[ffi.errno], 'eventfd failed') | ||||
|         return fd | ||||
| 
 | ||||
|     def write_eventfd(fd: int, value: int) -> int: | ||||
|         ''' | ||||
|         Write a 64-bit integer (uint64_t) to the eventfd's counter. | ||||
| 
 | ||||
|         ''' | ||||
|         # Create a uint64_t* in C, store `value` | ||||
|         data_ptr = ffi.new('uint64_t *', value) | ||||
| 
 | ||||
|         # Call write(fd, data_ptr, 8) | ||||
|         # We expect to write exactly 8 bytes (sizeof(uint64_t)) | ||||
|         ret = C.write(fd, data_ptr, 8) | ||||
|         if ret < 0: | ||||
|             raise OSError(errno.errorcode[ffi.errno], 'write to eventfd failed') | ||||
|         return ret | ||||
| 
 | ||||
|     def read_eventfd(fd: int) -> int: | ||||
|         ''' | ||||
|         Read a 64-bit integer (uint64_t) from the eventfd, returning the value. | ||||
|         Reading resets the counter to 0 (unless using EFD_SEMAPHORE). | ||||
| 
 | ||||
|         ''' | ||||
|         # Allocate an 8-byte buffer in C for reading | ||||
|         buf = ffi.new('char[]', 8) | ||||
| 
 | ||||
|         ret = C.read(fd, buf, 8) | ||||
|         if ret < 0: | ||||
|             raise OSError(errno.errorcode[ffi.errno], 'read from eventfd failed') | ||||
|         # Convert the 8 bytes we read into a Python integer | ||||
|         data_bytes = ffi.unpack(buf, 8)  # returns a Python bytes object of length 8 | ||||
|         value = int.from_bytes(data_bytes, byteorder='little', signed=False) | ||||
|         return value | ||||
| 
 | ||||
|     def close_eventfd(fd: int) -> int: | ||||
|         ''' | ||||
|         Close the eventfd. | ||||
| 
 | ||||
|         ''' | ||||
|         ret = C.close(fd) | ||||
|         if ret < 0: | ||||
|             raise OSError(errno.errorcode[ffi.errno], 'close failed') | ||||
| 
 | ||||
| 
 | ||||
|     class EventFD: | ||||
|         ''' | ||||
|         Use a previously opened eventfd(2), meant to be used in | ||||
|         sub-actors after root actor opens the eventfds then passes | ||||
|         them through pass_fds | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         def __init__( | ||||
|             self, | ||||
|             fd: int, | ||||
|             omode: str | ||||
|         ): | ||||
|             self._fd: int = fd | ||||
|             self._omode: str = omode | ||||
|             self._fobj = None | ||||
| 
 | ||||
|         @property | ||||
|         def fd(self) -> int | None: | ||||
|             return self._fd | ||||
| 
 | ||||
|         def write(self, value: int) -> int: | ||||
|             return write_eventfd(self._fd, value) | ||||
| 
 | ||||
|         async def read(self) -> int: | ||||
|             #TODO: how to handle signals? | ||||
|             return await trio.to_thread.run_sync(read_eventfd, self._fd) | ||||
| 
 | ||||
|         def open(self): | ||||
|             self._fobj = os.fdopen(self._fd, self._omode) | ||||
| 
 | ||||
|         def close(self): | ||||
|             if self._fobj: | ||||
|                 self._fobj.close() | ||||
| 
 | ||||
|         def __enter__(self): | ||||
|             self.open() | ||||
|             return self | ||||
| 
 | ||||
|         def __exit__(self, exc_type, exc_value, traceback): | ||||
|             self.close() | ||||
| 
 | ||||
| 
 | ||||
|     class RingBuffSender(trio.abc.SendStream): | ||||
|         ''' | ||||
|         IPC Reliable Ring Buffer sender side implementation | ||||
| 
 | ||||
|         `eventfd(2)` is used for wrap around sync, and also to signal | ||||
|         writes to the reader. | ||||
| 
 | ||||
|         TODO: if blocked on wrap around event wait it will not respond | ||||
|         to signals, fix soon TM | ||||
|         ''' | ||||
| 
 | ||||
|         def __init__( | ||||
|             self, | ||||
|             shm_key: str, | ||||
|             write_eventfd: int, | ||||
|             wrap_eventfd: int, | ||||
|             start_ptr: int = 0, | ||||
|             buf_size: int = 10 * 1024, | ||||
|             clean_shm_on_exit: bool = True | ||||
|         ): | ||||
|             self._shm = SharedMemory( | ||||
|                 name=shm_key, | ||||
|                 size=buf_size, | ||||
|                 create=True | ||||
|             ) | ||||
|             self._write_event = EventFD(write_eventfd, 'w') | ||||
|             self._wrap_event = EventFD(wrap_eventfd, 'r') | ||||
|             self._ptr = start_ptr | ||||
|             self.clean_shm_on_exit = clean_shm_on_exit | ||||
| 
 | ||||
|         @property | ||||
|         def key(self) -> str: | ||||
|             return self._shm.name | ||||
| 
 | ||||
|         @property | ||||
|         def size(self) -> int: | ||||
|             return self._shm.size | ||||
| 
 | ||||
|         @property | ||||
|         def ptr(self) -> int: | ||||
|             return self._ptr | ||||
| 
 | ||||
|         @property | ||||
|         def write_fd(self) -> int: | ||||
|             return self._write_event.fd | ||||
| 
 | ||||
|         @property | ||||
|         def wrap_fd(self) -> int: | ||||
|             return self._wrap_event.fd | ||||
| 
 | ||||
|         async def send_all(self, data: bytes | bytearray | memoryview): | ||||
|             # while data is larger than the remaining buf | ||||
|             target_ptr = self.ptr + len(data) | ||||
|             while target_ptr > self.size: | ||||
|                 # write all bytes that fit | ||||
|                 remaining = self.size - self.ptr | ||||
|                 self._shm.buf[self.ptr:] = data[:remaining] | ||||
|                 # signal write and wait for reader wrap around | ||||
|                 self._write_event.write(remaining) | ||||
|                 await self._wrap_event.read() | ||||
| 
 | ||||
|                 # wrap around and trim already written bytes | ||||
|                 self._ptr = 0 | ||||
|                 data = data[remaining:] | ||||
|                 target_ptr = self._ptr + len(data) | ||||
| 
 | ||||
|             # remaining data fits on buffer | ||||
|             self._shm.buf[self.ptr:target_ptr] = data | ||||
|             self._write_event.write(len(data)) | ||||
|             self._ptr = target_ptr | ||||
| 
 | ||||
|         async def wait_send_all_might_not_block(self): | ||||
|             raise NotImplementedError | ||||
| 
 | ||||
|         async def aclose(self): | ||||
|             self._write_event.close() | ||||
|             self._wrap_event.close() | ||||
|             if self.clean_shm_on_exit: | ||||
|                 self._shm.unlink() | ||||
| 
 | ||||
|             else: | ||||
|                 self._shm.close() | ||||
| 
 | ||||
|         async def __aenter__(self): | ||||
|             self._write_event.open() | ||||
|             self._wrap_event.open() | ||||
|             return self | ||||
| 
 | ||||
|         async def __aexit__(self, exc_type, exc_value, traceback): | ||||
|             await self.aclose() | ||||
| 
 | ||||
| 
 | ||||
|     class RingBuffReceiver(trio.abc.ReceiveStream): | ||||
|         ''' | ||||
|         IPC Reliable Ring Buffer receiver side implementation | ||||
| 
 | ||||
|         `eventfd(2)` is used for wrap around sync, and also to signal | ||||
|         writes to the reader. | ||||
| 
 | ||||
|         Unless eventfd(2) object is opened with EFD_NONBLOCK flag, | ||||
|         calls to `receive_some` will block the signal handling, | ||||
|         on the main thread, for now solution is using polling, | ||||
|         working on a way to unblock GIL during read(2) to allow | ||||
|         signal processing on the main thread. | ||||
|         ''' | ||||
| 
 | ||||
|         def __init__( | ||||
|             self, | ||||
|             shm_key: str, | ||||
|             write_eventfd: int, | ||||
|             wrap_eventfd: int, | ||||
|             start_ptr: int = 0, | ||||
|             buf_size: int = 10 * 1024, | ||||
|             flags: int = 0 | ||||
|         ): | ||||
|             self._shm = SharedMemory( | ||||
|                 name=shm_key, | ||||
|                 size=buf_size, | ||||
|                 create=False | ||||
|             ) | ||||
|             self._write_event = EventFD(write_eventfd, 'w') | ||||
|             self._wrap_event = EventFD(wrap_eventfd, 'r') | ||||
|             self._ptr = start_ptr | ||||
|             self._flags = flags | ||||
| 
 | ||||
|         @property | ||||
|         def key(self) -> str: | ||||
|             return self._shm.name | ||||
| 
 | ||||
|         @property | ||||
|         def size(self) -> int: | ||||
|             return self._shm.size | ||||
| 
 | ||||
|         @property | ||||
|         def ptr(self) -> int: | ||||
|             return self._ptr | ||||
| 
 | ||||
|         @property | ||||
|         def write_fd(self) -> int: | ||||
|             return self._write_event.fd | ||||
| 
 | ||||
|         @property | ||||
|         def wrap_fd(self) -> int: | ||||
|             return self._wrap_event.fd | ||||
| 
 | ||||
|         async def receive_some( | ||||
|             self, | ||||
|             max_bytes: int | None = None, | ||||
|             nb_timeout: float = 0.1 | ||||
|         ) -> memoryview: | ||||
|             # if non blocking eventfd enabled, do polling | ||||
|             # until next write, this allows signal handling | ||||
|             if self._flags | EFD_NONBLOCK: | ||||
|                 delta = None | ||||
|                 while delta is None: | ||||
|                     try: | ||||
|                         delta = await self._write_event.read() | ||||
| 
 | ||||
|                     except OSError as e: | ||||
|                         if e.errno == 'EAGAIN': | ||||
|                             continue | ||||
| 
 | ||||
|                         raise e | ||||
| 
 | ||||
|             else: | ||||
|                 delta = await self._write_event.read() | ||||
| 
 | ||||
|             # fetch next segment and advance ptr | ||||
|             next_ptr = self._ptr + delta | ||||
|             segment = self._shm.buf[self._ptr:next_ptr] | ||||
|             self._ptr = next_ptr | ||||
| 
 | ||||
|             if self.ptr == self.size: | ||||
|                 # reached the end, signal wrap around | ||||
|                 self._ptr = 0 | ||||
|                 self._wrap_event.write(1) | ||||
| 
 | ||||
|             return segment | ||||
| 
 | ||||
|         async def aclose(self): | ||||
|             self._write_event.close() | ||||
|             self._wrap_event.close() | ||||
|             self._shm.close() | ||||
| 
 | ||||
|         async def __aenter__(self): | ||||
|             self._write_event.open() | ||||
|             self._wrap_event.open() | ||||
|             return self | ||||
| 
 | ||||
|         async def __aexit__(self, exc_type, exc_value, traceback): | ||||
|             await self.aclose() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue