Add prepend support to shm system
							parent
							
								
									5b8e72065a
								
							
						
					
					
						commit
						5b8adc8881
					
				| 
						 | 
				
			
			@ -17,11 +17,10 @@
 | 
			
		|||
"""
 | 
			
		||||
NumPy compatible shared memory buffers for real-time FSP.
 | 
			
		||||
"""
 | 
			
		||||
from typing import List
 | 
			
		||||
from dataclasses import dataclass, asdict
 | 
			
		||||
from sys import byteorder
 | 
			
		||||
from typing import Tuple, Optional
 | 
			
		||||
from multiprocessing import shared_memory
 | 
			
		||||
from typing import List, Tuple, Optional
 | 
			
		||||
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
 | 
			
		||||
from multiprocessing import resource_tracker as mantracker
 | 
			
		||||
from _posixshmem import shm_unlink
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -29,7 +28,7 @@ import tractor
 | 
			
		|||
import numpy as np
 | 
			
		||||
 | 
			
		||||
from ..log import get_logger
 | 
			
		||||
from ._source import base_ohlc_dtype
 | 
			
		||||
from ._source import base_ohlc_dtype, base_iohlc_dtype
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
class SharedInt:
 | 
			
		||||
    """Wrapper around a single entry shared memory array which
 | 
			
		||||
    holds an ``int`` value used as an index counter.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        token: str,
 | 
			
		||||
        create: bool = False,
 | 
			
		||||
        shm: SharedMemory,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # create a single entry array for storing an index counter
 | 
			
		||||
        self._shm = shared_memory.SharedMemory(
 | 
			
		||||
            name=token,
 | 
			
		||||
            create=create,
 | 
			
		||||
            size=4,  # std int
 | 
			
		||||
        )
 | 
			
		||||
        self._shm = shm
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def value(self) -> int:
 | 
			
		||||
| 
						 | 
				
			
			@ -79,7 +76,7 @@ class SharedInt:
 | 
			
		|||
        self._shm.buf[:] = value.to_bytes(4, byteorder)
 | 
			
		||||
 | 
			
		||||
    def destroy(self) -> None:
 | 
			
		||||
        if shared_memory._USE_POSIX:
 | 
			
		||||
        if _USE_POSIX:
 | 
			
		||||
            # We manually unlink to bypass all the "resource tracker"
 | 
			
		||||
            # nonsense meant for non-SC systems.
 | 
			
		||||
            shm_unlink(self._shm.name)
 | 
			
		||||
| 
						 | 
				
			
			@ -91,7 +88,8 @@ class _Token:
 | 
			
		|||
    which can be used to key a system wide post shm entry.
 | 
			
		||||
    """
 | 
			
		||||
    shm_name: str  # this servers as a "key" value
 | 
			
		||||
    shm_counter_name: str
 | 
			
		||||
    shm_first_index_name: str
 | 
			
		||||
    shm_last_index_name: str
 | 
			
		||||
    dtype_descr: List[Tuple[str]]
 | 
			
		||||
 | 
			
		||||
    def __post_init__(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -130,27 +128,47 @@ def _make_token(
 | 
			
		|||
    """Create a serializable token that can be used
 | 
			
		||||
    to access a shared array.
 | 
			
		||||
    """
 | 
			
		||||
    dtype = base_ohlc_dtype if dtype is None else dtype
 | 
			
		||||
    dtype = base_iohlc_dtype if dtype is None else dtype
 | 
			
		||||
    return _Token(
 | 
			
		||||
        key,
 | 
			
		||||
        key + "_counter",
 | 
			
		||||
        key + "_first",
 | 
			
		||||
        key + "_last",
 | 
			
		||||
        np.dtype(dtype).descr
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ShmArray:
 | 
			
		||||
    """A shared memory ``numpy`` (compatible) array API.
 | 
			
		||||
 | 
			
		||||
    An underlying shared memory buffer is allocated based on
 | 
			
		||||
    a user specified ``numpy.ndarray``. This fixed size array
 | 
			
		||||
    can be read and written to by pushing data both onto the "front"
 | 
			
		||||
    or "back" of a set index range. The indexes for the "first" and
 | 
			
		||||
    "last" index are themselves stored in shared memory (accessed via
 | 
			
		||||
    ``SharedInt`` interfaces) values such that multiple processes can
 | 
			
		||||
    interact with the same array using a synchronized-index.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        shmarr: np.ndarray,
 | 
			
		||||
        counter: SharedInt,
 | 
			
		||||
        shm: shared_memory.SharedMemory,
 | 
			
		||||
        readonly: bool = True,
 | 
			
		||||
        first: SharedInt,
 | 
			
		||||
        last: SharedInt,
 | 
			
		||||
        shm: SharedMemory,
 | 
			
		||||
        # readonly: bool = True,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self._array = shmarr
 | 
			
		||||
        self._i = counter
 | 
			
		||||
 | 
			
		||||
        # indexes for first and last indices corresponding
 | 
			
		||||
        # to fille data
 | 
			
		||||
        self._first = first
 | 
			
		||||
        self._last = last
 | 
			
		||||
 | 
			
		||||
        self._len = len(shmarr)
 | 
			
		||||
        self._shm = shm
 | 
			
		||||
        self._readonly = readonly
 | 
			
		||||
 | 
			
		||||
        # pushing data does not write the index (aka primary key)
 | 
			
		||||
        self._write_fields = list(shmarr.dtype.fields.keys())[1:]
 | 
			
		||||
 | 
			
		||||
    # TODO: ringbuf api?
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -158,24 +176,25 @@ class ShmArray:
 | 
			
		|||
    def _token(self) -> _Token:
 | 
			
		||||
        return _Token(
 | 
			
		||||
            self._shm.name,
 | 
			
		||||
            self._i._shm.name,
 | 
			
		||||
            self._first._shm.name,
 | 
			
		||||
            self._last._shm.name,
 | 
			
		||||
            self._array.dtype.descr,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def token(self) -> dict:
 | 
			
		||||
        """Shared memory token that can be serialized
 | 
			
		||||
        and used by another process to attach to this array.
 | 
			
		||||
        """Shared memory token that can be serialized and used by
 | 
			
		||||
        another process to attach to this array.
 | 
			
		||||
        """
 | 
			
		||||
        return self._token.as_msg()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def index(self) -> int:
 | 
			
		||||
        return self._i.value % self._len
 | 
			
		||||
        return self._last.value % self._len
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def array(self) -> np.ndarray:
 | 
			
		||||
        return self._array[:self._i.value]
 | 
			
		||||
        return self._array[self._first.value:self._last.value]
 | 
			
		||||
 | 
			
		||||
    def last(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			@ -186,62 +205,90 @@ class ShmArray:
 | 
			
		|||
    def push(
 | 
			
		||||
        self,
 | 
			
		||||
        data: np.ndarray,
 | 
			
		||||
        prepend: bool = False,
 | 
			
		||||
    ) -> int:
 | 
			
		||||
        """Ring buffer like "push" to append data
 | 
			
		||||
        into the buffer and return updated index.
 | 
			
		||||
        into the buffer and return updated "last" index.
 | 
			
		||||
        """
 | 
			
		||||
        length = len(data)
 | 
			
		||||
        # TODO: use .index for actual ring logic?
 | 
			
		||||
        index = self._i.value
 | 
			
		||||
 | 
			
		||||
        if prepend:
 | 
			
		||||
            index = self._first.value - length
 | 
			
		||||
        else:
 | 
			
		||||
            index = self._last.value
 | 
			
		||||
 | 
			
		||||
        end = index + length
 | 
			
		||||
 | 
			
		||||
        fields = self._write_fields
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            self._array[index:end] = data[:]
 | 
			
		||||
            self._i.value = end
 | 
			
		||||
            self._array[fields][index:end] = data[fields][:]
 | 
			
		||||
            if prepend:
 | 
			
		||||
                self._first.value = index
 | 
			
		||||
            else:
 | 
			
		||||
                self._last.value = end
 | 
			
		||||
            return end
 | 
			
		||||
        except ValueError as err:
 | 
			
		||||
            # reraise with any field discrepancy
 | 
			
		||||
            our_fields, their_fields = (
 | 
			
		||||
                set(self._array.dtype.fields),
 | 
			
		||||
                set(data.dtype.fields),
 | 
			
		||||
            # shoudl raise if diff detected
 | 
			
		||||
            self.diff_err_fields(data)
 | 
			
		||||
 | 
			
		||||
            raise err
 | 
			
		||||
 | 
			
		||||
    def diff_err_fields(
 | 
			
		||||
        self,
 | 
			
		||||
        data: np.ndarray,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # reraise with any field discrepancy
 | 
			
		||||
        our_fields, their_fields = (
 | 
			
		||||
            set(self._array.dtype.fields),
 | 
			
		||||
            set(data.dtype.fields),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        only_in_ours = our_fields - their_fields
 | 
			
		||||
        only_in_theirs = their_fields - our_fields
 | 
			
		||||
 | 
			
		||||
        if only_in_ours:
 | 
			
		||||
            raise TypeError(
 | 
			
		||||
                f"Input array is missing field(s): {only_in_ours}"
 | 
			
		||||
            )
 | 
			
		||||
        elif only_in_theirs:
 | 
			
		||||
            raise TypeError(
 | 
			
		||||
                f"Input array has unknown field(s): {only_in_theirs}"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            only_in_ours = our_fields - their_fields
 | 
			
		||||
            only_in_theirs = their_fields - our_fields
 | 
			
		||||
 | 
			
		||||
            if only_in_ours:
 | 
			
		||||
                raise TypeError(
 | 
			
		||||
                    f"Input array is missing field(s): {only_in_ours}"
 | 
			
		||||
                )
 | 
			
		||||
            elif only_in_theirs:
 | 
			
		||||
                raise TypeError(
 | 
			
		||||
                    f"Input array has unknown field(s): {only_in_theirs}"
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                raise err
 | 
			
		||||
    def prepend(
 | 
			
		||||
        self,
 | 
			
		||||
        data: np.ndarray,
 | 
			
		||||
    ) -> int:
 | 
			
		||||
        end = self.push(data, prepend=True)
 | 
			
		||||
        assert end
 | 
			
		||||
 | 
			
		||||
    def close(self) -> None:
 | 
			
		||||
        self._i._shm.close()
 | 
			
		||||
        self._first._shm.close()
 | 
			
		||||
        self._last._shm.close()
 | 
			
		||||
        self._shm.close()
 | 
			
		||||
 | 
			
		||||
    def destroy(self) -> None:
 | 
			
		||||
        if shared_memory._USE_POSIX:
 | 
			
		||||
        if _USE_POSIX:
 | 
			
		||||
            # We manually unlink to bypass all the "resource tracker"
 | 
			
		||||
            # nonsense meant for non-SC systems.
 | 
			
		||||
            shm_unlink(self._shm.name)
 | 
			
		||||
        self._i.destroy()
 | 
			
		||||
 | 
			
		||||
        self._first.destroy()
 | 
			
		||||
        self._last.destroy()
 | 
			
		||||
 | 
			
		||||
    def flush(self) -> None:
 | 
			
		||||
        # TODO: flush to storage backend like markestore?
 | 
			
		||||
        ...
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_lotsa_5s = int(5 * 60 * 60 * 10 / 5)
 | 
			
		||||
 | 
			
		||||
# how  much is probably dependent on lifestyle
 | 
			
		||||
_secs_in_day = int(60 * 60 * 12)
 | 
			
		||||
_default_size = 2 * _secs_in_day
 | 
			
		||||
 | 
			
		||||
def open_shm_array(
 | 
			
		||||
    key: Optional[str] = None,
 | 
			
		||||
    # approx number of 5s bars in a "day" x2
 | 
			
		||||
    size: int = _lotsa_5s,
 | 
			
		||||
    size: int = _default_size,
 | 
			
		||||
    dtype: Optional[np.dtype] = None,
 | 
			
		||||
    readonly: bool = False,
 | 
			
		||||
) -> ShmArray:
 | 
			
		||||
| 
						 | 
				
			
			@ -253,7 +300,9 @@ def open_shm_array(
 | 
			
		|||
    # create new shared mem segment for which we
 | 
			
		||||
    # have write permission
 | 
			
		||||
    a = np.zeros(size, dtype=dtype)
 | 
			
		||||
    shm = shared_memory.SharedMemory(
 | 
			
		||||
    a['index'] = np.arange(len(a))
 | 
			
		||||
 | 
			
		||||
    shm = SharedMemory(
 | 
			
		||||
        name=key,
 | 
			
		||||
        create=True,
 | 
			
		||||
        size=a.nbytes
 | 
			
		||||
| 
						 | 
				
			
			@ -267,17 +316,30 @@ def open_shm_array(
 | 
			
		|||
        dtype=dtype
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    counter = SharedInt(
 | 
			
		||||
        token=token.shm_counter_name,
 | 
			
		||||
        create=True,
 | 
			
		||||
    # create single entry arrays for storing an first and last indices
 | 
			
		||||
    first = SharedInt(
 | 
			
		||||
        shm=SharedMemory(
 | 
			
		||||
            name=token.shm_first_index_name,
 | 
			
		||||
            create=True,
 | 
			
		||||
            size=4,  # std int
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
    counter.value = 0
 | 
			
		||||
 | 
			
		||||
    last = SharedInt(
 | 
			
		||||
        shm=SharedMemory(
 | 
			
		||||
            name=token.shm_last_index_name,
 | 
			
		||||
            create=True,
 | 
			
		||||
            size=4,  # std int
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    last.value = first.value = int(_secs_in_day)
 | 
			
		||||
 | 
			
		||||
    shmarr = ShmArray(
 | 
			
		||||
        array,
 | 
			
		||||
        counter,
 | 
			
		||||
        first,
 | 
			
		||||
        last,
 | 
			
		||||
        shm,
 | 
			
		||||
        readonly=readonly,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert shmarr._token == token
 | 
			
		||||
| 
						 | 
				
			
			@ -293,18 +355,23 @@ def open_shm_array(
 | 
			
		|||
 | 
			
		||||
def attach_shm_array(
 | 
			
		||||
    token: Tuple[str, str, Tuple[str, str]],
 | 
			
		||||
    size: int = _lotsa_5s,
 | 
			
		||||
    size: int = _default_size,
 | 
			
		||||
    readonly: bool = True,
 | 
			
		||||
) -> ShmArray:
 | 
			
		||||
    """Load and attach to an existing shared memory array previously
 | 
			
		||||
    """Attach to an existing shared memory array previously
 | 
			
		||||
    created by another process using ``open_shared_array``.
 | 
			
		||||
 | 
			
		||||
    No new shared mem is allocated but wrapper types for read/write
 | 
			
		||||
    access are constructed.
 | 
			
		||||
    """
 | 
			
		||||
    token = _Token.from_msg(token)
 | 
			
		||||
    key = token.shm_name
 | 
			
		||||
 | 
			
		||||
    if key in _known_tokens:
 | 
			
		||||
        assert _known_tokens[key] == token, "WTF"
 | 
			
		||||
 | 
			
		||||
    shm = shared_memory.SharedMemory(name=key)
 | 
			
		||||
    # attach to array buffer and view as per dtype
 | 
			
		||||
    shm = SharedMemory(name=key)
 | 
			
		||||
    shmarr = np.ndarray(
 | 
			
		||||
        (size,),
 | 
			
		||||
        dtype=token.dtype_descr,
 | 
			
		||||
| 
						 | 
				
			
			@ -312,15 +379,29 @@ def attach_shm_array(
 | 
			
		|||
    )
 | 
			
		||||
    shmarr.setflags(write=int(not readonly))
 | 
			
		||||
 | 
			
		||||
    counter = SharedInt(token=token.shm_counter_name)
 | 
			
		||||
    first = SharedInt(
 | 
			
		||||
        shm=SharedMemory(
 | 
			
		||||
            name=token.shm_first_index_name,
 | 
			
		||||
            create=False,
 | 
			
		||||
            size=4,  # std int
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
    last = SharedInt(
 | 
			
		||||
        shm=SharedMemory(
 | 
			
		||||
            name=token.shm_last_index_name,
 | 
			
		||||
            create=False,
 | 
			
		||||
            size=4,  # std int
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # make sure we can read
 | 
			
		||||
    counter.value
 | 
			
		||||
    first.value
 | 
			
		||||
 | 
			
		||||
    sha = ShmArray(
 | 
			
		||||
        shmarr,
 | 
			
		||||
        counter,
 | 
			
		||||
        first,
 | 
			
		||||
        last,
 | 
			
		||||
        shm,
 | 
			
		||||
        readonly=readonly,
 | 
			
		||||
    )
 | 
			
		||||
    # read test
 | 
			
		||||
    sha.array
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue