Flip to using `pydantic` for shm tokens

windows_fixes_yo
Tyler Goodlet 2022-02-04 10:24:40 -05:00
parent 615bf3a55a
commit efb743fd85
1 changed files with 47 additions and 29 deletions

View File

@ -18,9 +18,10 @@
NumPy compatible shared memory buffers for real-time IPC streaming. NumPy compatible shared memory buffers for real-time IPC streaming.
""" """
from __future__ import annotations
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
from typing import List, Tuple, Optional from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker from multiprocessing import resource_tracker as mantracker
@ -29,6 +30,7 @@ if _USE_POSIX:
import tractor import tractor
import numpy as np import numpy as np
from pydantic import BaseModel, validator
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
@ -85,26 +87,34 @@ class SharedInt:
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
@dataclass class _Token(BaseModel):
class _Token: '''
"""Internal represenation of a shared memory "token" Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
"""
'''
class Config:
frozen = True
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_first_index_name: str shm_first_index_name: str
shm_last_index_name: str shm_last_index_name: str
dtype_descr: List[Tuple[str]] dtype_descr: tuple
def __post_init__(self): @property
# np.array requires a list for dtype def dtype(self) -> np.dtype:
self.dtype_descr = np.dtype(list(map(tuple, self.dtype_descr))).descr return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self): def as_msg(self):
return asdict(self) return self.dict()
@classmethod @classmethod
def from_msg(self, msg: dict) -> '_Token': def from_msg(cls, msg: dict) -> _Token:
return msg if isinstance(msg, _Token) else _Token(**msg) if isinstance(msg, _Token):
return msg
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
# TODO: this api? # TODO: this api?
@ -127,15 +137,17 @@ def _make_token(
key: str, key: str,
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
) -> _Token: ) -> _Token:
"""Create a serializable token that can be used '''
Create a serializable token that can be used
to access a shared array. to access a shared array.
"""
'''
dtype = base_iohlc_dtype if dtype is None else dtype dtype = base_iohlc_dtype if dtype is None else dtype
return _Token( return _Token(
key, shm_name=key,
key + "_first", shm_first_index_name=key + "_first",
key + "_last", shm_last_index_name=key + "_last",
np.dtype(dtype).descr dtype_descr=np.dtype(dtype).descr
) )
@ -178,10 +190,10 @@ class ShmArray:
@property @property
def _token(self) -> _Token: def _token(self) -> _Token:
return _Token( return _Token(
self._shm.name, shm_name=self._shm.name,
self._first._shm.name, shm_first_index_name=self._first._shm.name,
self._last._shm.name, shm_last_index_name=self._last._shm.name,
self._array.dtype.descr, dtype_descr=tuple(self._array.dtype.descr),
) )
@property @property
@ -402,16 +414,19 @@ def open_shm_array(
def attach_shm_array( def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]], token: tuple[str, str, tuple[str, str]],
size: int = _default_size, size: int = _default_size,
readonly: bool = True, readonly: bool = True,
) -> ShmArray: ) -> ShmArray:
"""Attach to an existing shared memory array previously '''
Attach to an existing shared memory array previously
created by another process using ``open_shared_array``. created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write No new shared mem is allocated but wrapper types for read/write
access are constructed. access are constructed.
"""
'''
token = _Token.from_msg(token) token = _Token.from_msg(token)
key = token.shm_name key = token.shm_name
@ -422,7 +437,7 @@ def attach_shm_array(
shm = SharedMemory(name=key) shm = SharedMemory(name=key)
shmarr = np.ndarray( shmarr = np.ndarray(
(size,), (size,),
dtype=token.dtype_descr, dtype=token.dtype,
buffer=shm.buf buffer=shm.buf
) )
shmarr.setflags(write=int(not readonly)) shmarr.setflags(write=int(not readonly))
@ -470,8 +485,10 @@ def maybe_open_shm_array(
key: str, key: str,
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
**kwargs, **kwargs,
) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block using a "key" lookup ) -> tuple[ShmArray, bool]:
'''
Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registry to registered blocks in the users overall "system" registry
(presumes you don't have the block's explicit token). (presumes you don't have the block's explicit token).
@ -485,7 +502,8 @@ def maybe_open_shm_array(
If you know the explicit ``_Token`` for your memory segment instead If you know the explicit ``_Token`` for your memory segment instead
use ``attach_shm_array``. use ``attach_shm_array``.
"""
'''
try: try:
# see if we already know this key # see if we already know this key
token = _known_tokens[key] token = _known_tokens[key]