From efb743fd85d8cf2a53f4e900df5222b319917986 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 10:24:40 -0500 Subject: [PATCH] Flip to using `pydantic` for shm tokens --- piker/data/_sharedmem.py | 76 +++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index c741ba1c..5f7fdcd0 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -18,9 +18,10 @@ NumPy compatible shared memory buffers for real-time IPC streaming. """ +from __future__ import annotations from dataclasses import dataclass, asdict from sys import byteorder -from typing import List, Tuple, Optional +from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing import resource_tracker as mantracker @@ -29,6 +30,7 @@ if _USE_POSIX: import tractor import numpy as np +from pydantic import BaseModel, validator from ..log import get_logger from ._source import base_iohlc_dtype @@ -85,26 +87,34 @@ class SharedInt: shm_unlink(self._shm.name) -@dataclass -class _Token: - """Internal represenation of a shared memory "token" +class _Token(BaseModel): + ''' + Internal represenation of a shared memory "token" which can be used to key a system wide post shm entry. - """ + + ''' + class Config: + frozen = True + shm_name: str # this servers as a "key" value shm_first_index_name: str shm_last_index_name: str - dtype_descr: List[Tuple[str]] + dtype_descr: tuple - def __post_init__(self): - # np.array requires a list for dtype - self.dtype_descr = np.dtype(list(map(tuple, self.dtype_descr))).descr + @property + def dtype(self) -> np.dtype: + return np.dtype(list(map(tuple, self.dtype_descr))).descr def as_msg(self): - return asdict(self) + return self.dict() @classmethod - def from_msg(self, msg: dict) -> '_Token': - return msg if isinstance(msg, _Token) else _Token(**msg) + def from_msg(cls, msg: dict) -> _Token: + if isinstance(msg, _Token): + return msg + + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) + return _Token(**msg) # TODO: this api? @@ -127,15 +137,17 @@ def _make_token( key: str, dtype: Optional[np.dtype] = None, ) -> _Token: - """Create a serializable token that can be used + ''' + Create a serializable token that can be used to access a shared array. - """ + + ''' dtype = base_iohlc_dtype if dtype is None else dtype return _Token( - key, - key + "_first", - key + "_last", - np.dtype(dtype).descr + shm_name=key, + shm_first_index_name=key + "_first", + shm_last_index_name=key + "_last", + dtype_descr=np.dtype(dtype).descr ) @@ -178,10 +190,10 @@ class ShmArray: @property def _token(self) -> _Token: return _Token( - self._shm.name, - self._first._shm.name, - self._last._shm.name, - self._array.dtype.descr, + shm_name=self._shm.name, + shm_first_index_name=self._first._shm.name, + shm_last_index_name=self._last._shm.name, + dtype_descr=tuple(self._array.dtype.descr), ) @property @@ -402,16 +414,19 @@ def open_shm_array( def attach_shm_array( - token: Tuple[str, str, Tuple[str, str]], + token: tuple[str, str, tuple[str, str]], size: int = _default_size, readonly: bool = True, + ) -> ShmArray: - """Attach to an existing shared memory array previously + ''' + Attach to an existing shared memory array previously created by another process using ``open_shared_array``. No new shared mem is allocated but wrapper types for read/write access are constructed. - """ + + ''' token = _Token.from_msg(token) key = token.shm_name @@ -422,7 +437,7 @@ def attach_shm_array( shm = SharedMemory(name=key) shmarr = np.ndarray( (size,), - dtype=token.dtype_descr, + dtype=token.dtype, buffer=shm.buf ) shmarr.setflags(write=int(not readonly)) @@ -470,8 +485,10 @@ def maybe_open_shm_array( key: str, dtype: Optional[np.dtype] = None, **kwargs, -) -> Tuple[ShmArray, bool]: - """Attempt to attach to a shared memory block using a "key" lookup + +) -> tuple[ShmArray, bool]: + ''' + Attempt to attach to a shared memory block using a "key" lookup to registered blocks in the users overall "system" registry (presumes you don't have the block's explicit token). @@ -485,7 +502,8 @@ def maybe_open_shm_array( If you know the explicit ``_Token`` for your memory segment instead use ``attach_shm_array``. - """ + + ''' try: # see if we already know this key token = _known_tokens[key]