Add prepend support to shm system

to_qpainterpath_and_beyond
Tyler Goodlet 2020-12-09 10:30:31 -05:00
parent 9710685508
commit 02b7d6cd19
1 changed files with 150 additions and 69 deletions

View File

@ -17,11 +17,10 @@
"""
NumPy compatible shared memory buffers for real-time FSP.
"""
from typing import List
from dataclasses import dataclass, asdict
from sys import byteorder
from typing import Tuple, Optional
from multiprocessing import shared_memory
from typing import List, Tuple, Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink
@ -29,7 +28,7 @@ import tractor
import numpy as np
from ..log import get_logger
from ._source import base_ohlc_dtype
from ._source import base_ohlc_dtype, base_iohlc_dtype
log = get_logger(__name__)
@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__(
self,
token: str,
create: bool = False,
shm: SharedMemory,
) -> None:
# create a single entry array for storing an index counter
self._shm = shared_memory.SharedMemory(
name=token,
create=create,
size=4, # std int
)
self._shm = shm
@property
def value(self) -> int:
@ -79,7 +76,7 @@ class SharedInt:
self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None:
if shared_memory._USE_POSIX:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
@ -91,7 +88,8 @@ class _Token:
which can be used to key a system wide post shm entry.
"""
shm_name: str # this servers as a "key" value
shm_counter_name: str
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: List[Tuple[str]]
def __post_init__(self):
@ -130,27 +128,47 @@ def _make_token(
"""Create a serializable token that can be used
to access a shared array.
"""
dtype = base_ohlc_dtype if dtype is None else dtype
dtype = base_iohlc_dtype if dtype is None else dtype
return _Token(
key,
key + "_counter",
key + "_first",
key + "_last",
np.dtype(dtype).descr
)
class ShmArray:
"""A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
"""
def __init__(
self,
shmarr: np.ndarray,
counter: SharedInt,
shm: shared_memory.SharedMemory,
readonly: bool = True,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
self._i = counter
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._readonly = readonly
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api?
@ -158,24 +176,25 @@ class ShmArray:
def _token(self) -> _Token:
return _Token(
self._shm.name,
self._i._shm.name,
self._first._shm.name,
self._last._shm.name,
self._array.dtype.descr,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized
and used by another process to attach to this array.
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._i.value % self._len
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
return self._array[:self._i.value]
return self._array[self._first.value:self._last.value]
def last(
self,
@ -186,19 +205,39 @@ class ShmArray:
def push(
self,
data: np.ndarray,
prepend: bool = False,
) -> int:
"""Ring buffer like "push" to append data
into the buffer and return updated index.
into the buffer and return updated "last" index.
"""
length = len(data)
# TODO: use .index for actual ring logic?
index = self._i.value
if prepend:
index = self._first.value - length
else:
index = self._last.value
end = index + length
fields = self._write_fields
try:
self._array[index:end] = data[:]
self._i.value = end
self._array[fields][index:end] = data[fields][:]
if prepend:
self._first.value = index
else:
self._last.value = end
return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
@ -216,32 +255,40 @@ class ShmArray:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
else:
raise err
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._i._shm.close()
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if shared_memory._USE_POSIX:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._i.destroy()
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
_lotsa_5s = int(5 * 60 * 60 * 10 / 5)
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
def open_shm_array(
key: Optional[str] = None,
# approx number of 5s bars in a "day" x2
size: int = _lotsa_5s,
size: int = _default_size,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
@ -253,7 +300,9 @@ def open_shm_array(
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory(
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
@ -267,17 +316,30 @@ def open_shm_array(
dtype=dtype
)
counter = SharedInt(
token=token.shm_counter_name,
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
counter.value = 0
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
last.value = first.value = int(_secs_in_day)
shmarr = ShmArray(
array,
counter,
first,
last,
shm,
readonly=readonly,
)
assert shmarr._token == token
@ -293,18 +355,23 @@ def open_shm_array(
def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]],
size: int = _lotsa_5s,
size: int = _default_size,
readonly: bool = True,
) -> ShmArray:
"""Load and attach to an existing shared memory array previously
"""Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
"""
token = _Token.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert _known_tokens[key] == token, "WTF"
shm = shared_memory.SharedMemory(name=key)
# attach to array buffer and view as per dtype
shm = SharedMemory(name=key)
shmarr = np.ndarray(
(size,),
dtype=token.dtype_descr,
@ -312,15 +379,29 @@ def attach_shm_array(
)
shmarr.setflags(write=int(not readonly))
counter = SharedInt(token=token.shm_counter_name)
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
counter.value
first.value
sha = ShmArray(
shmarr,
counter,
first,
last,
shm,
readonly=readonly,
)
# read test
sha.array