From 901353c21320982b9add8077e8d5739c15606472 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 15 Oct 2022 16:35:32 -0400 Subject: [PATCH] Initial module import from `piker.data._sharemem` More or less a verbatim copy-paste minus some edgy variable naming and internal `piker` module imports. There is a bunch of OHLC related defaults that need to be dropped and we need to adjust to an optional dependence on `numpy` by supporting shared lists as per the mp docs. --- tractor/_shm.py | 706 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 706 insertions(+) create mode 100644 tractor/_shm.py diff --git a/tractor/_shm.py b/tractor/_shm.py new file mode 100644 index 0000000..dca9d5a --- /dev/null +++ b/tractor/_shm.py @@ -0,0 +1,706 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +SC friendly shared memory management geared at real-time +processing. + +Support for ``numpy`` compatible array-buffers is provided but is +considered optional within the context of this runtime-library. + +""" +from __future__ import annotations +from sys import byteorder +import time +from typing import Optional +from multiprocessing.shared_memory import ( + SharedMemory, + _USE_POSIX, +) + +if _USE_POSIX: + from _posixshmem import shm_unlink + +from msgspec import Struct +import numpy as np +from numpy.lib import recfunctions as rfn +import tractor + +from .log import get_logger + + +log = get_logger(__name__) + + +# how much is probably dependent on lifestyle +_secs_in_day = int(60 * 60 * 24) +# we try for a buncha times, but only on a run-every-other-day kinda week. +_days_worth = 16 +_default_size = _days_worth * _secs_in_day +# where to start the new data append index +_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) + + +def disable_mantracker(): + ''' + Disable all ``multiprocessing``` "resource tracking" machinery since + it's an absolute multi-threaded mess of non-SC madness. + + ''' + from multiprocessing import resource_tracker as mantracker + + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + # ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd + + +disable_mantracker() + + +class SharedInt: + """Wrapper around a single entry shared memory array which + holds an ``int`` value used as an index counter. + + """ + def __init__( + self, + shm: SharedMemory, + ) -> None: + self._shm = shm + + @property + def value(self) -> int: + return int.from_bytes(self._shm.buf, byteorder) + + @value.setter + def value(self, value) -> None: + self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder) + + def destroy(self) -> None: + if _USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + name = self._shm.name + try: + shm_unlink(name) + except FileNotFoundError: + # might be a teardown race here? + log.warning(f'Shm for {name} already unlinked?') + + +class _Token(Struct, frozen=True): + ''' + Internal represenation of a shared memory "token" + which can be used to key a system wide post shm entry. + + ''' + shm_name: str # this servers as a "key" value + shm_first_index_name: str + shm_last_index_name: str + dtype_descr: tuple + size: int # in struct-array index / row terms + + @property + def dtype(self) -> np.dtype: + return np.dtype(list(map(tuple, self.dtype_descr))).descr + + def as_msg(self): + return self.to_dict() + + @classmethod + def from_msg(cls, msg: dict) -> _Token: + if isinstance(msg, _Token): + return msg + + # TODO: native struct decoding + # return _token_dec.decode(msg) + + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) + return _Token(**msg) + + +# _token_dec = msgspec.msgpack.Decoder(_Token) + +# TODO: this api? +# _known_tokens = tractor.ActorVar('_shm_tokens', {}) +# _known_tokens = tractor.ContextStack('_known_tokens', ) +# _known_tokens = trio.RunVar('shms', {}) + +# process-local store of keys to tokens +_known_tokens = {} + + +def get_shm_token(key: str) -> _Token: + """Convenience func to check if a token + for the provided key is known by this process. + """ + return _known_tokens.get(key) + + +def _make_token( + key: str, + size: int, + dtype: np.dtype, + +) -> _Token: + ''' + Create a serializable token that can be used + to access a shared array. + + ''' + return _Token( + shm_name=key, + shm_first_index_name=key + "_first", + shm_last_index_name=key + "_last", + dtype_descr=tuple(np.dtype(dtype).descr), + size=size, + ) + + +class ShmArray: + ''' + A shared memory ``numpy`` (compatible) array API. + + An underlying shared memory buffer is allocated based on + a user specified ``numpy.ndarray``. This fixed size array + can be read and written to by pushing data both onto the "front" + or "back" of a set index range. The indexes for the "first" and + "last" index are themselves stored in shared memory (accessed via + ``SharedInt`` interfaces) values such that multiple processes can + interact with the same array using a synchronized-index. + + ''' + def __init__( + self, + shmarr: np.ndarray, + first: SharedInt, + last: SharedInt, + shm: SharedMemory, + # readonly: bool = True, + ) -> None: + self._array = shmarr + + # indexes for first and last indices corresponding + # to fille data + self._first = first + self._last = last + + self._len = len(shmarr) + self._shm = shm + self._post_init: bool = False + + # pushing data does not write the index (aka primary key) + dtype = shmarr.dtype + if dtype.fields: + self._write_fields = list(shmarr.dtype.fields.keys())[1:] + else: + self._write_fields = None + + # TODO: ringbuf api? + + @property + def _token(self) -> _Token: + return _Token( + shm_name=self._shm.name, + shm_first_index_name=self._first._shm.name, + shm_last_index_name=self._last._shm.name, + dtype_descr=tuple(self._array.dtype.descr), + size=self._len, + ) + + @property + def token(self) -> dict: + """Shared memory token that can be serialized and used by + another process to attach to this array. + """ + return self._token.as_msg() + + @property + def index(self) -> int: + return self._last.value % self._len + + @property + def array(self) -> np.ndarray: + ''' + Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a + + def ustruct( + self, + fields: Optional[list[str]] = None, + + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = np.float, + + ) -> np.ndarray: + + array = self._array + + if fields: + selection = array[fields] + # fcount = len(fields) + else: + selection = array + # fcount = len(array.dtype.fields) + + # XXX: manual ``.view()`` attempt that also doesn't work. + # uview = selection.view( + # dtype=' np.ndarray: + ''' + Return the last ``length``'s worth of ("row") entries from the + array. + + ''' + return self.array[-length:] + + def push( + self, + data: np.ndarray, + + field_map: Optional[dict[str, str]] = None, + prepend: bool = False, + update_first: bool = True, + start: Optional[int] = None, + + ) -> int: + ''' + Ring buffer like "push" to append data + into the buffer and return updated "last" index. + + NB: no actual ring logic yet to give a "loop around" on overflow + condition, lel. + + ''' + length = len(data) + + if prepend: + index = (start or self._first.value) - length + + if index < 0: + raise ValueError( + f'Array size of {self._len} was overrun during prepend.\n' + f'You have passed {abs(index)} too many datums.' + ) + + else: + index = start if start is not None else self._last.value + + end = index + length + + if field_map: + src_names, dst_names = zip(*field_map.items()) + else: + dst_names = src_names = self._write_fields + + try: + self._array[ + list(dst_names) + ][index:end] = data[list(src_names)][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. + if prepend and update_first and length: + assert index < self._first.value + + if ( + index < self._first.value + and update_first + ): + assert prepend, 'prepend=True not passed but index decreased?' + self._first.value = index + + elif not prepend: + self._last.value = end + + self._post_init = True + return end + + except ValueError as err: + if field_map: + raise + + # should raise if diff detected + self.diff_err_fields(data) + raise err + + def diff_err_fields( + self, + data: np.ndarray, + ) -> None: + # reraise with any field discrepancy + our_fields, their_fields = ( + set(self._array.dtype.fields), + set(data.dtype.fields), + ) + + only_in_ours = our_fields - their_fields + only_in_theirs = their_fields - our_fields + + if only_in_ours: + raise TypeError( + f"Input array is missing field(s): {only_in_ours}" + ) + elif only_in_theirs: + raise TypeError( + f"Input array has unknown field(s): {only_in_theirs}" + ) + + # TODO: support "silent" prepends that don't update ._first.value? + def prepend( + self, + data: np.ndarray, + ) -> int: + end = self.push(data, prepend=True) + assert end + + def close(self) -> None: + self._first._shm.close() + self._last._shm.close() + self._shm.close() + + def destroy(self) -> None: + if _USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + shm_unlink(self._shm.name) + + self._first.destroy() + self._last.destroy() + + def flush(self) -> None: + # TODO: flush to storage backend like markestore? + ... + + +def open_shm_array( + + key: Optional[str] = None, + size: int = _default_size, # see above + dtype: Optional[np.dtype] = None, + readonly: bool = False, + +) -> ShmArray: + '''Open a memory shared ``numpy`` using the standard library. + + This call unlinks (aka permanently destroys) the buffer on teardown + and thus should be used from the parent-most accessor (process). + + ''' + # create new shared mem segment for which we + # have write permission + a = np.zeros(size, dtype=dtype) + a['index'] = np.arange(len(a)) + + shm = SharedMemory( + name=key, + create=True, + size=a.nbytes + ) + array = np.ndarray( + a.shape, + dtype=a.dtype, + buffer=shm.buf + ) + array[:] = a[:] + array.setflags(write=int(not readonly)) + + token = _make_token( + key=key, + size=size, + dtype=dtype, + ) + + # create single entry arrays for storing an first and last indices + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=True, + size=4, # std int + ) + ) + + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=True, + size=4, # std int + ) + ) + + # start the "real-time" updated section after 3-days worth of 1s + # sampled OHLC. this allows appending up to a days worth from + # tick/quote feeds before having to flush to a (tsdb) storage + # backend, and looks something like, + # ------------------------- + # | | i + # _________________________ + # <-------------> <-------> + # history real-time + # + # Once fully "prepended", the history section will leave the + # ``ShmArray._start.value: int = 0`` and the yet-to-be written + # real-time section will start at ``ShmArray.index: int``. + + # this sets the index to 3/4 of the length of the buffer + # leaving a "days worth of second samples" for the real-time + # section. + last.value = first.value = _rt_buffer_start + + shmarr = ShmArray( + array, + first, + last, + shm, + ) + + assert shmarr._token == token + _known_tokens[key] = shmarr.token + + # "unlink" created shm on process teardown by + # pushing teardown calls onto actor context stack + + stack = tractor.current_actor().lifetime_stack + stack.callback(shmarr.close) + stack.callback(shmarr.destroy) + + return shmarr + + +def attach_shm_array( + token: tuple[str, str, tuple[str, str]], + readonly: bool = True, + +) -> ShmArray: + ''' + Attach to an existing shared memory array previously + created by another process using ``open_shared_array``. + + No new shared mem is allocated but wrapper types for read/write + access are constructed. + + ''' + token = _Token.from_msg(token) + key = token.shm_name + + if key in _known_tokens: + assert _Token.from_msg(_known_tokens[key]) == token, "WTF" + + # XXX: ugh, looks like due to the ``shm_open()`` C api we can't + # actually place files in a subdir, see discussion here: + # https://stackoverflow.com/a/11103289 + + # attach to array buffer and view as per dtype + _err: Optional[Exception] = None + for _ in range(3): + try: + shm = SharedMemory( + name=key, + create=False, + ) + break + except OSError as oserr: + _err = oserr + time.sleep(0.1) + else: + if _err: + raise _err + + shmarr = np.ndarray( + (token.size,), + dtype=token.dtype, + buffer=shm.buf + ) + shmarr.setflags(write=int(not readonly)) + + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=False, + size=4, # std int + ), + ) + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=False, + size=4, # std int + ), + ) + + # make sure we can read + first.value + + sha = ShmArray( + shmarr, + first, + last, + shm, + ) + # read test + sha.array + + # Stash key -> token knowledge for future queries + # via `maybe_opepn_shm_array()` but only after we know + # we can attach. + if key not in _known_tokens: + _known_tokens[key] = token + + # "close" attached shm on actor teardown + tractor.current_actor().lifetime_stack.callback(sha.close) + + return sha + + +def maybe_open_shm_array( + key: str, + dtype: Optional[np.dtype] = None, + **kwargs, + +) -> tuple[ShmArray, bool]: + ''' + Attempt to attach to a shared memory block using a "key" lookup + to registered blocks in the users overall "system" registry + (presumes you don't have the block's explicit token). + + This function is meant to solve the problem of discovering whether + a shared array token has been allocated or discovered by the actor + running in **this** process. Systems where multiple actors may seek + to access a common block can use this function to attempt to acquire + a token as discovered by the actors who have previously stored + a "key" -> ``_Token`` map in an actor local (aka python global) + variable. + + If you know the explicit ``_Token`` for your memory segment instead + use ``attach_shm_array``. + + ''' + size = kwargs.pop('size', _default_size) + try: + # see if we already know this key + token = _known_tokens[key] + return attach_shm_array(token=token, **kwargs), False + except KeyError: + log.warning(f"Could not find {key} in shms cache") + if dtype: + token = _make_token( + key, + size=size, + dtype=dtype, + ) + try: + return attach_shm_array(token=token, **kwargs), False + except FileNotFoundError: + log.warning(f"Could not attach to shm with token {token}") + + # This actor does not know about memory + # associated with the provided "key". + # Attempt to open a block and expect + # to fail if a block has been allocated + # on the OS by someone else. + return open_shm_array(key=key, dtype=dtype, **kwargs), True + + +def try_read( + array: np.ndarray + +) -> Optional[np.ndarray]: + ''' + Try to read the last row from a shared mem array or ``None`` + if the array read returns a zero-length array result. + + Can be used to check for backfilling race conditions where an array + is currently being (re-)written by a writer actor but the reader is + unaware and reads during the window where the first and last indexes + are being updated. + + ''' + try: + return array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + + # the array read was emtpy + return None