# tractor: structured concurrent "actors". # Copyright 2018-eternity Tyler Goodlet. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' IPC Reliable RingBuffer implementation ''' from __future__ import annotations import struct from typing import ( ContextManager, AsyncContextManager ) from contextlib import ( contextmanager as cm, asynccontextmanager as acm ) from multiprocessing.shared_memory import SharedMemory import trio from msgspec import ( Struct, to_builtins ) from ._linux import ( open_eventfd, EFDReadCancelled, EventFD ) from ._mp_bs import disable_mantracker from tractor.log import get_logger from tractor._exceptions import ( InternalError ) log = get_logger(__name__) disable_mantracker() _DEFAULT_RB_SIZE = 10 * 1024 class RBToken(Struct, frozen=True): ''' RingBuffer token contains necesary info to open the three eventfds and the shared memory ''' shm_name: str write_eventfd: int # used to signal writer ptr advance wrap_eventfd: int # used to signal reader ready after wrap around eof_eventfd: int # used to signal writer closed buf_size: int def as_msg(self): return to_builtins(self) @classmethod def from_msg(cls, msg: dict) -> RBToken: if isinstance(msg, RBToken): return msg return RBToken(**msg) @property def fds(self) -> tuple[int, int, int]: ''' Useful for `pass_fds` params ''' return ( self.write_eventfd, self.wrap_eventfd, self.eof_eventfd ) @cm def open_ringbuf( shm_name: str, buf_size: int = _DEFAULT_RB_SIZE, ) -> ContextManager[RBToken]: ''' Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver` ''' shm = SharedMemory( name=shm_name, size=buf_size, create=True ) try: with ( EventFD(open_eventfd(), 'r') as write_event, EventFD(open_eventfd(), 'r') as wrap_event, EventFD(open_eventfd(), 'r') as eof_event, ): token = RBToken( shm_name=shm_name, write_eventfd=write_event.fd, wrap_eventfd=wrap_event.fd, eof_eventfd=eof_event.fd, buf_size=buf_size ) yield token finally: shm.unlink() Buffer = bytes | bytearray | memoryview ''' IPC Reliable Ring Buffer `eventfd(2)` is used for wrap around sync, to signal writes to the reader and end of stream. ''' class RingBuffSender(trio.abc.SendStream): ''' Ring Buffer sender side implementation Do not use directly! manage with `attach_to_ringbuf_sender` after having opened a ringbuf context with `open_ringbuf`. ''' def __init__( self, token: RBToken, cleanup: bool = False ): self._token = RBToken.from_msg(token) self._shm: SharedMemory | None = None self._write_event = EventFD(self._token.write_eventfd, 'w') self._wrap_event = EventFD(self._token.wrap_eventfd, 'r') self._eof_event = EventFD(self._token.eof_eventfd, 'w') self._ptr = 0 self._cleanup = cleanup self._send_lock = trio.StrictFIFOLock() @property def name(self) -> str: if not self._shm: raise ValueError('shared memory not initialized yet!') return self._shm.name @property def size(self) -> int: return self._token.buf_size @property def ptr(self) -> int: return self._ptr @property def write_fd(self) -> int: return self._write_event.fd @property def wrap_fd(self) -> int: return self._wrap_event.fd async def send_all(self, data: Buffer): async with self._send_lock: # while data is larger than the remaining buf target_ptr = self.ptr + len(data) while target_ptr > self.size: # write all bytes that fit remaining = self.size - self.ptr self._shm.buf[self.ptr:] = data[:remaining] # signal write and wait for reader wrap around self._write_event.write(remaining) await self._wrap_event.read() # wrap around and trim already written bytes self._ptr = 0 data = data[remaining:] target_ptr = self._ptr + len(data) # remaining data fits on buffer self._shm.buf[self.ptr:target_ptr] = data self._write_event.write(len(data)) self._ptr = target_ptr async def wait_send_all_might_not_block(self): raise NotImplementedError def open(self): self._shm = SharedMemory( name=self._token.shm_name, size=self._token.buf_size, create=False ) self._write_event.open() self._wrap_event.open() self._eof_event.open() def close(self): self._eof_event.write( self._ptr if self._ptr > 0 else self.size ) if self._cleanup: self._write_event.close() self._wrap_event.close() self._eof_event.close() self._shm.close() async def aclose(self): async with self._send_lock: self.close() async def __aenter__(self): self.open() return self class RingBuffReceiver(trio.abc.ReceiveStream): ''' Ring Buffer receiver side implementation Do not use directly! manage with `attach_to_ringbuf_receiver` after having opened a ringbuf context with `open_ringbuf`. ''' def __init__( self, token: RBToken, cleanup: bool = True, ): self._token = RBToken.from_msg(token) self._shm: SharedMemory | None = None self._write_event = EventFD(self._token.write_eventfd, 'w') self._wrap_event = EventFD(self._token.wrap_eventfd, 'r') self._eof_event = EventFD(self._token.eof_eventfd, 'r') self._ptr: int = 0 self._write_ptr: int = 0 self._end_ptr: int = -1 self._cleanup: bool = cleanup @property def name(self) -> str: if not self._shm: raise ValueError('shared memory not initialized yet!') return self._shm.name @property def size(self) -> int: return self._token.buf_size @property def ptr(self) -> int: return self._ptr @property def write_fd(self) -> int: return self._write_event.fd @property def wrap_fd(self) -> int: return self._wrap_event.fd async def _eof_monitor_task(self): ''' Long running EOF event monitor, automatically run in bg by `attach_to_ringbuf_receiver` context manager, if EOF event is set its value will be the end pointer (highest valid index to be read from buf, after setting the `self._end_ptr` we close the write event which should cancel any blocked `self._write_event.read()`s on it. ''' try: self._end_ptr = await self._eof_event.read() self._write_event.close() except EFDReadCancelled: ... except trio.Cancelled: ... async def receive_some(self, max_bytes: int | None = None) -> bytes: ''' Receive up to `max_bytes`, if no `max_bytes` is provided a reasonable default is used. ''' if max_bytes is None: max_bytes: int = _DEFAULT_RB_SIZE if max_bytes < 1: raise ValueError("max_bytes must be >= 1") # delta is remaining bytes we havent read delta = self._write_ptr - self._ptr if delta == 0: # we have read all we can, see if new data is available if self._end_ptr < 0: # if we havent been signaled about EOF yet try: delta = await self._write_event.read() self._write_ptr += delta except EFDReadCancelled: # while waiting for new data `self._write_event` was closed # this means writer signaled EOF if self._end_ptr > 0: # final self._write_ptr modification and recalculate delta self._write_ptr = self._end_ptr delta = self._end_ptr - self._ptr else: # shouldnt happen cause self._eof_monitor_task always sets # self._end_ptr before closing self._write_event raise InternalError( 'self._write_event.read cancelled but self._end_ptr is not set' ) else: # no more bytes to read and self._end_ptr set, EOF reached return b'' # dont overflow caller delta = min(delta, max_bytes) target_ptr = self._ptr + delta # fetch next segment and advance ptr segment = bytes(self._shm.buf[self._ptr:target_ptr]) self._ptr = target_ptr if self._ptr == self.size: # reached the end, signal wrap around self._ptr = 0 self._write_ptr = 0 self._wrap_event.write(1) return segment def open(self): self._shm = SharedMemory( name=self._token.shm_name, size=self._token.buf_size, create=False ) self._write_event.open() self._wrap_event.open() self._eof_event.open() def close(self): if self._cleanup: self._write_event.close() self._wrap_event.close() self._eof_event.close() self._shm.close() async def aclose(self): self.close() async def __aenter__(self): self.open() return self @acm async def attach_to_ringbuf_receiver( token: RBToken, cleanup: bool = True ) -> AsyncContextManager[RingBuffReceiver]: ''' Attach a RingBuffReceiver from a previously opened RBToken. Launches `receiver._eof_monitor_task` in a `trio.Nursery`. ''' async with ( trio.open_nursery() as n, RingBuffReceiver( token, cleanup=cleanup ) as receiver ): n.start_soon(receiver._eof_monitor_task) yield receiver @acm async def attach_to_ringbuf_sender( token: RBToken, cleanup: bool = True ) -> AsyncContextManager[RingBuffSender]: ''' Attach a RingBuffSender from a previously opened RBToken. ''' async with RingBuffSender( token, cleanup=cleanup ) as sender: yield sender @cm def open_ringbuf_pair( name: str, buf_size: int = _DEFAULT_RB_SIZE ) -> ContextManager[tuple(RBToken, RBToken)]: ''' Handle resources for a ringbuf pair to be used for bidirectional messaging. ''' with ( open_ringbuf( name + '.pair0', buf_size=buf_size ) as token_0, open_ringbuf( name + '.pair1', buf_size=buf_size ) as token_1 ): yield token_0, token_1 @acm async def attach_to_ringbuf_stream( token_in: RBToken, token_out: RBToken, cleanup_in: bool = True, cleanup_out: bool = True ) -> AsyncContextManager[trio.StapledStream]: ''' Attach a trio.StapledStream from a previously opened ringbuf pair. ''' async with ( attach_to_ringbuf_receiver( token_in, cleanup=cleanup_in ) as receiver, attach_to_ringbuf_sender( token_out, cleanup=cleanup_out ) as sender, ): yield trio.StapledStream(sender, receiver) class RingBuffBytesSender(trio.abc.SendChannel[bytes]): ''' In order to guarantee full messages are received, all bytes sent by `RingBuffBytesSender` are preceded with a 4 byte header which decodes into a uint32 indicating the actual size of the next payload. Optional batch mode: If `batch_size` > 1 messages wont get sent immediately but will be stored until `batch_size` messages are pending, then it will send them all at once. `batch_size` can be changed dynamically but always call, `flush()` right before. ''' def __init__( self, sender: RingBuffSender, batch_size: int = 1 ): self._sender = sender self.batch_size = batch_size self._batch_msg_len = 0 self._batch: bytes = b'' async def flush(self) -> None: await self._sender.send_all(self._batch) self._batch = b'' self._batch_msg_len = 0 async def send(self, value: bytes) -> None: msg: bytes = struct.pack(" None: await self._sender.aclose() class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]): ''' See `RingBuffBytesSender` docstring. A `tricycle.BufferedReceiveStream` is used for the `receive_exactly` API. ''' def __init__( self, receiver: RingBuffReceiver ): self._receiver = receiver async def _receive_exactly(self, num_bytes: int) -> bytes: ''' Fetch bytes from receiver until we read exactly `num_bytes` or end of stream is signaled. ''' payload = b'' while len(payload) < num_bytes: remaining = num_bytes - len(payload) new_bytes = await self._receiver.receive_some( max_bytes=remaining ) if new_bytes == b'': raise trio.EndOfChannel payload += new_bytes return payload async def receive(self) -> bytes: header: bytes = await self._receive_exactly(4) size: int size, = struct.unpack(" None: await self._receiver.aclose() @acm async def attach_to_ringbuf_rchannel( token: RBToken, cleanup: bool = True ) -> AsyncContextManager[RingBuffBytesReceiver]: ''' Attach a RingBuffBytesReceiver from a previously opened RBToken. ''' async with attach_to_ringbuf_receiver( token, cleanup=cleanup ) as receiver: yield RingBuffBytesReceiver(receiver) @acm async def attach_to_ringbuf_schannel( token: RBToken, cleanup: bool = True, batch_size: int = 1, ) -> AsyncContextManager[RingBuffBytesSender]: ''' Attach a RingBuffBytesSender from a previously opened RBToken. ''' async with attach_to_ringbuf_sender( token, cleanup=cleanup ) as sender: yield RingBuffBytesSender(sender, batch_size=batch_size) class RingBuffChannel(trio.abc.Channel[bytes]): ''' Combine `RingBuffBytesSender` and `RingBuffBytesReceiver` in order to expose the bidirectional `trio.abc.Channel` API. ''' def __init__( self, sender: RingBuffBytesSender, receiver: RingBuffBytesReceiver ): self._sender = sender self._receiver = receiver async def send(self, value: bytes): await self._sender.send(value) async def receive(self) -> bytes: return await self._receiver.receive() async def aclose(self): await self._receiver.aclose() await self._sender.aclose() @acm async def attach_to_ringbuf_channel( token_in: RBToken, token_out: RBToken, cleanup_in: bool = True, cleanup_out: bool = True ) -> AsyncContextManager[RingBuffChannel]: ''' Attach to an already opened ringbuf pair and return a `RingBuffChannel`. ''' async with ( attach_to_ringbuf_rchannel( token_in, cleanup=cleanup_in ) as receiver, attach_to_ringbuf_schannel( token_out, cleanup=cleanup_out ) as sender, ): yield RingBuffChannel(sender, receiver)