diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 6747ed9d..d6c53a95 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -1,16 +1,24 @@ """ NumPy compatible shared memory buffers for real-time FSP. """ +from typing import List +from dataclasses import dataclass, asdict from sys import byteorder -from contextlib import contextmanager from typing import Tuple, Optional from multiprocessing import shared_memory from multiprocessing import resource_tracker as mantracker from _posixshmem import shm_unlink +import tractor import numpy as np +from ..log import get_logger + + +log = get_logger(__name__) + + # Tell the "resource tracker" thing to fuck off. class ManTracker(mantracker.ResourceTracker): def register(self, name, rtype): @@ -74,6 +82,58 @@ class SharedInt: shm_unlink(self._shm.name) +@dataclass +class _Token: + """Internal represenation of a shared memory "token" + which can be used to key a system wide post shm entry. + """ + shm_name: str # this servers as a "key" value + shm_counter_name: str + dtype_descr: List[Tuple[str]] + + def __post_init__(self): + # np.array requires a list for dtype + self.dtype_descr = np.dtype( + list(self.dtype_descr)).descr + + def as_msg(self): + return asdict(self) + + @classmethod + def from_msg(self, msg: dict) -> '_Token': + return msg if isinstance(msg, _Token) else _Token(**msg) + + +# TODO: this api? +# _known_tokens = tractor.ActorVar('_shm_tokens', {}) +# _known_tokens = tractor.ContextStack('_known_tokens', ) +# _known_tokens = trio.RunVar('shms', {}) + +# process-local store of keys to tokens +_known_tokens = {} + + +def get_shm_token(key: str) -> _Token: + """Convenience func to check if a token + for the provided key is known by this process. + """ + return _known_tokens.get(key) + + +def _make_token( + key: str, + dtype: np.dtype = base_ohlc_dtype, +) -> _Token: + """Create a serializable token that can be used + to access a shared array. + """ + return _Token( + key, + key + "_counter", + dtype.descr + ) + + class SharedArray: def __init__( self, @@ -91,12 +151,19 @@ class SharedArray: # TODO: ringbuf api? @property - def token(self) -> Tuple[str, str]: - return (self._shm.name, self._i._shm.name) + def _token(self) -> _Token: + return _Token( + self._shm.name, + self._i._shm.name, + self._array.dtype.descr, + ) @property - def name(self) -> str: - return self._shm.name + def token(self) -> dict: + """Shared memory token that can be serialized + and used by another process to attach to this array. + """ + return self._token.as_msg() @property def index(self) -> int: @@ -143,9 +210,8 @@ class SharedArray: ... -@contextmanager -def open_shared_array( - name: Optional[str] = None, +def open_shm_array( + key: Optional[str] = None, # approx number of 5s bars in a "day" x2 size: int = int(2*60*60*10/5), dtype: np.dtype = base_ohlc_dtype, @@ -160,51 +226,66 @@ def open_shared_array( # have write permission a = np.zeros(size, dtype=dtype) shm = shared_memory.SharedMemory( - name=name, + name=key, create=True, size=a.nbytes ) - shmarr = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) - shmarr[:] = a[:] - shmarr.setflags(write=int(not readonly)) + array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) + array[:] = a[:] + array.setflags(write=int(not readonly)) + + token = _make_token( + key=key, + dtype=dtype + ) counter = SharedInt( - token=shm.name + "_counter", + token=token.shm_counter_name, create=True, ) counter.value = 0 - sha = SharedArray( - shmarr, + shmarr = SharedArray( + array, counter, shm, readonly=readonly, ) - try: - yield sha - finally: - sha.close() - print(f"UNLINKING {sha.token}") - sha.destroy() + + assert shmarr._token == token + _known_tokens[key] = shmarr.token + + # "unlink" created shm on process teardown by + # pushing teardown calls onto actor context stack + actor = tractor.current_actor() + actor._lifetime_stack.callback(shmarr.close) + actor._lifetime_stack.callback(shmarr.destroy) + return shmarr -@contextmanager -def attach_shared_array( - token: Tuple[str, str], +def attach_shm_array( + token: Tuple[str, str, Tuple[str, str]], size: int = int(60*60*10/5), - dtype: np.dtype = base_ohlc_dtype, + # dtype: np.dtype = base_ohlc_dtype, readonly: bool = True, ) -> SharedArray: """Load and attach to an existing shared memory array previously created by another process using ``open_shared_array``. """ - array_name, counter_name = token + token = _Token.from_msg(token) + key = token.shm_name + if key in _known_tokens: + assert _known_tokens[key] == token, "WTF" - shm = shared_memory.SharedMemory(name=array_name) - shmarr = np.ndarray((size,), dtype=dtype, buffer=shm.buf) + shm = shared_memory.SharedMemory(name=key) + shmarr = np.ndarray( + (size,), + dtype=token.dtype_descr, + buffer=shm.buf + ) shmarr.setflags(write=int(not readonly)) - counter = SharedInt(token=counter_name) + counter = SharedInt(token=token.shm_counter_name) # make sure we can read counter.value @@ -214,28 +295,58 @@ def attach_shared_array( shm, readonly=readonly, ) + # read test sha.array - try: - yield sha - finally: - pass - sha.close() + + # Stash key -> token knowledge for future queries + # via `maybe_opepn_shm_array()` but only after we know + # we can attach. + if key not in _known_tokens: + _known_tokens[key] = token + + # "close" attached shm on process teardown + actor = tractor.current_actor() + actor._lifetime_stack.callback(sha.close) + return sha -@contextmanager -def maybe_open_shared_array( - name: str, +def maybe_open_shm_array( + key: str, + dtype: np.dtype = base_ohlc_dtype, **kwargs, -) -> SharedArray: +) -> Tuple[SharedArray, bool]: + """Attempt to attach to a shared memory block by a + "key" determined by the users overall "system" + (presumes you don't have the block's explicit token). + + This function is meant to solve the problem of + discovering whether a shared array token has been + allocated or discovered by the actor running in + **this** process. Systems where multiple actors + may seek to access a common block can use this + function to attempt to acquire a token as discovered + by the actors who have previously stored a + "key" -> ``_Token`` map in an actor local variable. + + If you know the explicit ``_Token`` for your memory + instead use ``attach_shm_array``. + """ try: - with open_shared_array( - name=name, - **kwargs, - ) as shm: - yield shm - except FileExistsError: - with attach_shared_array( - token=(name, name + '_counter'), - **kwargs, - ) as shm: - yield shm + # see if we already know this key + token = _known_tokens[key] + return attach_shm_array(token=token, **kwargs), False + except KeyError: + log.debug(f"Could not find {key} in shms cache") + if dtype: + token = _make_token(key, dtype) + try: + return attach_shm_array(token=token, **kwargs), False + except FileNotFoundError: + log.debug(f"Could not attach to shm with token {token}") + + # This actor does not know about memory + # associated with the provided "key". + # Attempt to open a block and expect + # to fail if a block has been allocated + # on the OS by someone else. + return open_shm_array(key=key, dtype=dtype, **kwargs), True