diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 1172fc7b..82f61e79 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX if _USE_POSIX: from _posixshmem import shm_unlink -import tractor +# import msgspec import numpy as np -from pydantic import BaseModel from numpy.lib import recfunctions as rfn +import tractor from ..log import get_logger from ._source import base_iohlc_dtype +from .types import Struct log = get_logger(__name__) @@ -107,15 +108,12 @@ class SharedInt: log.warning(f'Shm for {name} already unlinked?') -class _Token(BaseModel): +class _Token(Struct, frozen=True): ''' Internal represenation of a shared memory "token" which can be used to key a system wide post shm entry. ''' - class Config: - frozen = True - shm_name: str # this servers as a "key" value shm_first_index_name: str shm_last_index_name: str @@ -126,17 +124,22 @@ class _Token(BaseModel): return np.dtype(list(map(tuple, self.dtype_descr))).descr def as_msg(self): - return self.dict() + return self.to_dict() @classmethod def from_msg(cls, msg: dict) -> _Token: if isinstance(msg, _Token): return msg + # TODO: native struct decoding + # return _token_dec.decode(msg) + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) return _Token(**msg) +# _token_dec = msgspec.msgpack.Decoder(_Token) + # TODO: this api? # _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ContextStack('_known_tokens', ) @@ -167,7 +170,7 @@ def _make_token( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", - dtype_descr=np.dtype(dtype).descr + dtype_descr=tuple(np.dtype(dtype).descr) )