Use struct for shm tokens
parent
693c7ce12a
commit
6cc02bd8f5
|
@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
|
||||||
if _USE_POSIX:
|
if _USE_POSIX:
|
||||||
from _posixshmem import shm_unlink
|
from _posixshmem import shm_unlink
|
||||||
|
|
||||||
import tractor
|
# import msgspec
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pydantic import BaseModel
|
|
||||||
from numpy.lib import recfunctions as rfn
|
from numpy.lib import recfunctions as rfn
|
||||||
|
import tractor
|
||||||
|
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ._source import base_iohlc_dtype
|
from ._source import base_iohlc_dtype
|
||||||
|
from .types import Struct
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -107,15 +108,12 @@ class SharedInt:
|
||||||
log.warning(f'Shm for {name} already unlinked?')
|
log.warning(f'Shm for {name} already unlinked?')
|
||||||
|
|
||||||
|
|
||||||
class _Token(BaseModel):
|
class _Token(Struct, frozen=True):
|
||||||
'''
|
'''
|
||||||
Internal represenation of a shared memory "token"
|
Internal represenation of a shared memory "token"
|
||||||
which can be used to key a system wide post shm entry.
|
which can be used to key a system wide post shm entry.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
class Config:
|
|
||||||
frozen = True
|
|
||||||
|
|
||||||
shm_name: str # this servers as a "key" value
|
shm_name: str # this servers as a "key" value
|
||||||
shm_first_index_name: str
|
shm_first_index_name: str
|
||||||
shm_last_index_name: str
|
shm_last_index_name: str
|
||||||
|
@ -126,17 +124,22 @@ class _Token(BaseModel):
|
||||||
return np.dtype(list(map(tuple, self.dtype_descr))).descr
|
return np.dtype(list(map(tuple, self.dtype_descr))).descr
|
||||||
|
|
||||||
def as_msg(self):
|
def as_msg(self):
|
||||||
return self.dict()
|
return self.to_dict()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_msg(cls, msg: dict) -> _Token:
|
def from_msg(cls, msg: dict) -> _Token:
|
||||||
if isinstance(msg, _Token):
|
if isinstance(msg, _Token):
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
# TODO: native struct decoding
|
||||||
|
# return _token_dec.decode(msg)
|
||||||
|
|
||||||
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
|
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
|
||||||
return _Token(**msg)
|
return _Token(**msg)
|
||||||
|
|
||||||
|
|
||||||
|
# _token_dec = msgspec.msgpack.Decoder(_Token)
|
||||||
|
|
||||||
# TODO: this api?
|
# TODO: this api?
|
||||||
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
|
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
|
||||||
# _known_tokens = tractor.ContextStack('_known_tokens', )
|
# _known_tokens = tractor.ContextStack('_known_tokens', )
|
||||||
|
@ -167,7 +170,7 @@ def _make_token(
|
||||||
shm_name=key,
|
shm_name=key,
|
||||||
shm_first_index_name=key + "_first",
|
shm_first_index_name=key + "_first",
|
||||||
shm_last_index_name=key + "_last",
|
shm_last_index_name=key + "_last",
|
||||||
dtype_descr=np.dtype(dtype).descr
|
dtype_descr=tuple(np.dtype(dtype).descr)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue