# tractor: structured concurrent "actors". # Copyright 2018-eternity Tyler Goodlet. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ SC friendly shared memory management geared at real-time processing. Support for ``numpy`` compatible array-buffers is provided but is considered optional within the context of this runtime-library. """ from __future__ import annotations from sys import byteorder import time from typing import Optional from multiprocessing.shared_memory import ( SharedMemory, _USE_POSIX, ) if _USE_POSIX: from _posixshmem import shm_unlink from msgspec import Struct import numpy as np from numpy.lib import recfunctions as rfn import tractor from .log import get_logger log = get_logger(__name__) # how much is probably dependent on lifestyle _secs_in_day = int(60 * 60 * 24) # we try for a buncha times, but only on a run-every-other-day kinda week. _days_worth = 16 _default_size = _days_worth * _secs_in_day # where to start the new data append index _rt_buffer_start = int((_days_worth - 1) * _secs_in_day) def disable_mantracker(): ''' Disable all ``multiprocessing``` "resource tracking" machinery since it's an absolute multi-threaded mess of non-SC madness. ''' from multiprocessing import resource_tracker as mantracker # Tell the "resource tracker" thing to fuck off. class ManTracker(mantracker.ResourceTracker): def register(self, name, rtype): pass def unregister(self, name, rtype): pass def ensure_running(self): pass # "know your land and know your prey" # https://www.dailymotion.com/video/x6ozzco mantracker._resource_tracker = ManTracker() mantracker.register = mantracker._resource_tracker.register mantracker.ensure_running = mantracker._resource_tracker.ensure_running # ensure_running = mantracker._resource_tracker.ensure_running mantracker.unregister = mantracker._resource_tracker.unregister mantracker.getfd = mantracker._resource_tracker.getfd disable_mantracker() class SharedInt: """Wrapper around a single entry shared memory array which holds an ``int`` value used as an index counter. """ def __init__( self, shm: SharedMemory, ) -> None: self._shm = shm @property def value(self) -> int: return int.from_bytes(self._shm.buf, byteorder) @value.setter def value(self, value) -> None: self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder) def destroy(self) -> None: if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. name = self._shm.name try: shm_unlink(name) except FileNotFoundError: # might be a teardown race here? log.warning(f'Shm for {name} already unlinked?') class _Token(Struct, frozen=True): ''' Internal represenation of a shared memory "token" which can be used to key a system wide post shm entry. ''' shm_name: str # this servers as a "key" value shm_first_index_name: str shm_last_index_name: str dtype_descr: tuple size: int # in struct-array index / row terms @property def dtype(self) -> np.dtype: return np.dtype(list(map(tuple, self.dtype_descr))).descr def as_msg(self): return self.to_dict() @classmethod def from_msg(cls, msg: dict) -> _Token: if isinstance(msg, _Token): return msg # TODO: native struct decoding # return _token_dec.decode(msg) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) return _Token(**msg) # _token_dec = msgspec.msgpack.Decoder(_Token) # TODO: this api? # _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ContextStack('_known_tokens', ) # _known_tokens = trio.RunVar('shms', {}) # process-local store of keys to tokens _known_tokens = {} def get_shm_token(key: str) -> _Token: """Convenience func to check if a token for the provided key is known by this process. """ return _known_tokens.get(key) def _make_token( key: str, size: int, dtype: np.dtype, ) -> _Token: ''' Create a serializable token that can be used to access a shared array. ''' return _Token( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", dtype_descr=tuple(np.dtype(dtype).descr), size=size, ) class ShmArray: ''' A shared memory ``numpy`` (compatible) array API. An underlying shared memory buffer is allocated based on a user specified ``numpy.ndarray``. This fixed size array can be read and written to by pushing data both onto the "front" or "back" of a set index range. The indexes for the "first" and "last" index are themselves stored in shared memory (accessed via ``SharedInt`` interfaces) values such that multiple processes can interact with the same array using a synchronized-index. ''' def __init__( self, shmarr: np.ndarray, first: SharedInt, last: SharedInt, shm: SharedMemory, # readonly: bool = True, ) -> None: self._array = shmarr # indexes for first and last indices corresponding # to fille data self._first = first self._last = last self._len = len(shmarr) self._shm = shm self._post_init: bool = False # pushing data does not write the index (aka primary key) dtype = shmarr.dtype if dtype.fields: self._write_fields = list(shmarr.dtype.fields.keys())[1:] else: self._write_fields = None # TODO: ringbuf api? @property def _token(self) -> _Token: return _Token( shm_name=self._shm.name, shm_first_index_name=self._first._shm.name, shm_last_index_name=self._last._shm.name, dtype_descr=tuple(self._array.dtype.descr), size=self._len, ) @property def token(self) -> dict: """Shared memory token that can be serialized and used by another process to attach to this array. """ return self._token.as_msg() @property def index(self) -> int: return self._last.value % self._len @property def array(self) -> np.ndarray: ''' Return an up-to-date ``np.ndarray`` view of the so-far-written data to the underlying shm buffer. ''' a = self._array[self._first.value:self._last.value] # first, last = self._first.value, self._last.value # a = self._array[first:last] # TODO: eventually comment this once we've not seen it in the # wild in a long time.. # XXX: race where first/last indexes cause a reader # to load an empty array.. if len(a) == 0 and self._post_init: raise RuntimeError('Empty array race condition hit!?') # breakpoint() return a def ustruct( self, fields: Optional[list[str]] = None, # type that all field values will be cast to # in the returned view. common_dtype: np.dtype = np.float, ) -> np.ndarray: array = self._array if fields: selection = array[fields] # fcount = len(fields) else: selection = array # fcount = len(array.dtype.fields) # XXX: manual ``.view()`` attempt that also doesn't work. # uview = selection.view( # dtype=' np.ndarray: ''' Return the last ``length``'s worth of ("row") entries from the array. ''' return self.array[-length:] def push( self, data: np.ndarray, field_map: Optional[dict[str, str]] = None, prepend: bool = False, update_first: bool = True, start: Optional[int] = None, ) -> int: ''' Ring buffer like "push" to append data into the buffer and return updated "last" index. NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. ''' length = len(data) if prepend: index = (start or self._first.value) - length if index < 0: raise ValueError( f'Array size of {self._len} was overrun during prepend.\n' f'You have passed {abs(index)} too many datums.' ) else: index = start if start is not None else self._last.value end = index + length if field_map: src_names, dst_names = zip(*field_map.items()) else: dst_names = src_names = self._write_fields try: self._array[ list(dst_names) ][index:end] = data[list(src_names)][:] # NOTE: there was a race here between updating # the first and last indices and when the next reader # tries to access ``.array`` (which due to the index # overlap will be empty). Pretty sure we've fixed it now # but leaving this here as a reminder. if prepend and update_first and length: assert index < self._first.value if ( index < self._first.value and update_first ): assert prepend, 'prepend=True not passed but index decreased?' self._first.value = index elif not prepend: self._last.value = end self._post_init = True return end except ValueError as err: if field_map: raise # should raise if diff detected self.diff_err_fields(data) raise err def diff_err_fields( self, data: np.ndarray, ) -> None: # reraise with any field discrepancy our_fields, their_fields = ( set(self._array.dtype.fields), set(data.dtype.fields), ) only_in_ours = our_fields - their_fields only_in_theirs = their_fields - our_fields if only_in_ours: raise TypeError( f"Input array is missing field(s): {only_in_ours}" ) elif only_in_theirs: raise TypeError( f"Input array has unknown field(s): {only_in_theirs}" ) # TODO: support "silent" prepends that don't update ._first.value? def prepend( self, data: np.ndarray, ) -> int: end = self.push(data, prepend=True) assert end def close(self) -> None: self._first._shm.close() self._last._shm.close() self._shm.close() def destroy(self) -> None: if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. shm_unlink(self._shm.name) self._first.destroy() self._last.destroy() def flush(self) -> None: # TODO: flush to storage backend like markestore? ... def open_shm_array( key: Optional[str] = None, size: int = _default_size, # see above dtype: Optional[np.dtype] = None, readonly: bool = False, ) -> ShmArray: '''Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). ''' # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) a['index'] = np.arange(len(a)) shm = SharedMemory( name=key, create=True, size=a.nbytes ) array = np.ndarray( a.shape, dtype=a.dtype, buffer=shm.buf ) array[:] = a[:] array.setflags(write=int(not readonly)) token = _make_token( key=key, size=size, dtype=dtype, ) # create single entry arrays for storing an first and last indices first = SharedInt( shm=SharedMemory( name=token.shm_first_index_name, create=True, size=4, # std int ) ) last = SharedInt( shm=SharedMemory( name=token.shm_last_index_name, create=True, size=4, # std int ) ) # start the "real-time" updated section after 3-days worth of 1s # sampled OHLC. this allows appending up to a days worth from # tick/quote feeds before having to flush to a (tsdb) storage # backend, and looks something like, # ------------------------- # | | i # _________________________ # <-------------> <-------> # history real-time # # Once fully "prepended", the history section will leave the # ``ShmArray._start.value: int = 0`` and the yet-to-be written # real-time section will start at ``ShmArray.index: int``. # this sets the index to 3/4 of the length of the buffer # leaving a "days worth of second samples" for the real-time # section. last.value = first.value = _rt_buffer_start shmarr = ShmArray( array, first, last, shm, ) assert shmarr._token == token _known_tokens[key] = shmarr.token # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack stack = tractor.current_actor().lifetime_stack stack.callback(shmarr.close) stack.callback(shmarr.destroy) return shmarr def attach_shm_array( token: tuple[str, str, tuple[str, str]], readonly: bool = True, ) -> ShmArray: ''' Attach to an existing shared memory array previously created by another process using ``open_shared_array``. No new shared mem is allocated but wrapper types for read/write access are constructed. ''' token = _Token.from_msg(token) key = token.shm_name if key in _known_tokens: assert _Token.from_msg(_known_tokens[key]) == token, "WTF" # XXX: ugh, looks like due to the ``shm_open()`` C api we can't # actually place files in a subdir, see discussion here: # https://stackoverflow.com/a/11103289 # attach to array buffer and view as per dtype _err: Optional[Exception] = None for _ in range(3): try: shm = SharedMemory( name=key, create=False, ) break except OSError as oserr: _err = oserr time.sleep(0.1) else: if _err: raise _err shmarr = np.ndarray( (token.size,), dtype=token.dtype, buffer=shm.buf ) shmarr.setflags(write=int(not readonly)) first = SharedInt( shm=SharedMemory( name=token.shm_first_index_name, create=False, size=4, # std int ), ) last = SharedInt( shm=SharedMemory( name=token.shm_last_index_name, create=False, size=4, # std int ), ) # make sure we can read first.value sha = ShmArray( shmarr, first, last, shm, ) # read test sha.array # Stash key -> token knowledge for future queries # via `maybe_opepn_shm_array()` but only after we know # we can attach. if key not in _known_tokens: _known_tokens[key] = token # "close" attached shm on actor teardown tractor.current_actor().lifetime_stack.callback(sha.close) return sha def maybe_open_shm_array( key: str, dtype: Optional[np.dtype] = None, **kwargs, ) -> tuple[ShmArray, bool]: ''' Attempt to attach to a shared memory block using a "key" lookup to registered blocks in the users overall "system" registry (presumes you don't have the block's explicit token). This function is meant to solve the problem of discovering whether a shared array token has been allocated or discovered by the actor running in **this** process. Systems where multiple actors may seek to access a common block can use this function to attempt to acquire a token as discovered by the actors who have previously stored a "key" -> ``_Token`` map in an actor local (aka python global) variable. If you know the explicit ``_Token`` for your memory segment instead use ``attach_shm_array``. ''' size = kwargs.pop('size', _default_size) try: # see if we already know this key token = _known_tokens[key] return attach_shm_array(token=token, **kwargs), False except KeyError: log.warning(f"Could not find {key} in shms cache") if dtype: token = _make_token( key, size=size, dtype=dtype, ) try: return attach_shm_array(token=token, **kwargs), False except FileNotFoundError: log.warning(f"Could not attach to shm with token {token}") # This actor does not know about memory # associated with the provided "key". # Attempt to open a block and expect # to fail if a block has been allocated # on the OS by someone else. return open_shm_array(key=key, dtype=dtype, **kwargs), True def try_read( array: np.ndarray ) -> Optional[np.ndarray]: ''' Try to read the last row from a shared mem array or ``None`` if the array read returns a zero-length array result. Can be used to check for backfilling race conditions where an array is currently being (re-)written by a writer actor but the reader is unaware and reads during the window where the first and last indexes are being updated. ''' try: return array[-1] except IndexError: # XXX: race condition with backfilling shm. # # the underlying issue is that a backfill (aka prepend) and subsequent # shm array first/last index update could result in an empty array # read here since the indices may be updated in such a way that # a read delivers an empty array (though it seems like we # *should* be able to prevent that?). also, as and alt and # something we need anyway, maybe there should be some kind of # signal that a prepend is taking place and this consumer can # respond (eg. redrawing graphics) accordingly. # the array read was emtpy return None