Slight rework: shm API

Add an internal `_Token` to do interchange (un)packing for passing
"references" to shm blocks between actors.  Part of the token involves
providing the `numpy.dtype` in a cross-actor format.  Add a module
variable for caching "known tokens" per actor.  Drop use of context
managers since they tear down shm blocks too soon in debug mode and
there seems to be no reason to unlink/close shm before the process has
terminated; if code needs it torn down explicitly, it can.
bar_select
Tyler Goodlet 2020-09-22 08:31:47 -04:00
parent cd540fd07e
commit 38469bd6ef
1 changed files with 160 additions and 49 deletions

View File

@ -1,16 +1,24 @@
""" """
NumPy compatible shared memory buffers for real-time FSP. NumPy compatible shared memory buffers for real-time FSP.
""" """
from typing import List
from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
from contextlib import contextmanager
from typing import Tuple, Optional from typing import Tuple, Optional
from multiprocessing import shared_memory from multiprocessing import shared_memory
from multiprocessing import resource_tracker as mantracker from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
import tractor
import numpy as np import numpy as np
from ..log import get_logger
log = get_logger(__name__)
# Tell the "resource tracker" thing to fuck off. # Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker): class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype): def register(self, name, rtype):
@ -74,6 +82,58 @@ class SharedInt:
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
@dataclass
class _Token:
"""Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry.
"""
shm_name: str # this servers as a "key" value
shm_counter_name: str
dtype_descr: List[Tuple[str]]
def __post_init__(self):
# np.array requires a list for dtype
self.dtype_descr = np.dtype(
list(self.dtype_descr)).descr
def as_msg(self):
return asdict(self)
@classmethod
def from_msg(self, msg: dict) -> '_Token':
return msg if isinstance(msg, _Token) else _Token(**msg)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
# _known_tokens = trio.RunVar('shms', {})
# process-local store of keys to tokens
_known_tokens = {}
def get_shm_token(key: str) -> _Token:
"""Convenience func to check if a token
for the provided key is known by this process.
"""
return _known_tokens.get(key)
def _make_token(
key: str,
dtype: np.dtype = base_ohlc_dtype,
) -> _Token:
"""Create a serializable token that can be used
to access a shared array.
"""
return _Token(
key,
key + "_counter",
dtype.descr
)
class SharedArray: class SharedArray:
def __init__( def __init__(
self, self,
@ -91,12 +151,19 @@ class SharedArray:
# TODO: ringbuf api? # TODO: ringbuf api?
@property @property
def token(self) -> Tuple[str, str]: def _token(self) -> _Token:
return (self._shm.name, self._i._shm.name) return _Token(
self._shm.name,
self._i._shm.name,
self._array.dtype.descr,
)
@property @property
def name(self) -> str: def token(self) -> dict:
return self._shm.name """Shared memory token that can be serialized
and used by another process to attach to this array.
"""
return self._token.as_msg()
@property @property
def index(self) -> int: def index(self) -> int:
@ -143,9 +210,8 @@ class SharedArray:
... ...
@contextmanager def open_shm_array(
def open_shared_array( key: Optional[str] = None,
name: Optional[str] = None,
# approx number of 5s bars in a "day" x2 # approx number of 5s bars in a "day" x2
size: int = int(2*60*60*10/5), size: int = int(2*60*60*10/5),
dtype: np.dtype = base_ohlc_dtype, dtype: np.dtype = base_ohlc_dtype,
@ -160,51 +226,66 @@ def open_shared_array(
# have write permission # have write permission
a = np.zeros(size, dtype=dtype) a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory( shm = shared_memory.SharedMemory(
name=name, name=key,
create=True, create=True,
size=a.nbytes size=a.nbytes
) )
shmarr = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
shmarr[:] = a[:] array[:] = a[:]
shmarr.setflags(write=int(not readonly)) array.setflags(write=int(not readonly))
token = _make_token(
key=key,
dtype=dtype
)
counter = SharedInt( counter = SharedInt(
token=shm.name + "_counter", token=token.shm_counter_name,
create=True, create=True,
) )
counter.value = 0 counter.value = 0
sha = SharedArray( shmarr = SharedArray(
shmarr, array,
counter, counter,
shm, shm,
readonly=readonly, readonly=readonly,
) )
try:
yield sha assert shmarr._token == token
finally: _known_tokens[key] = shmarr.token
sha.close()
print(f"UNLINKING {sha.token}") # "unlink" created shm on process teardown by
sha.destroy() # pushing teardown calls onto actor context stack
actor = tractor.current_actor()
actor._lifetime_stack.callback(shmarr.close)
actor._lifetime_stack.callback(shmarr.destroy)
return shmarr
@contextmanager def attach_shm_array(
def attach_shared_array( token: Tuple[str, str, Tuple[str, str]],
token: Tuple[str, str],
size: int = int(60*60*10/5), size: int = int(60*60*10/5),
dtype: np.dtype = base_ohlc_dtype, # dtype: np.dtype = base_ohlc_dtype,
readonly: bool = True, readonly: bool = True,
) -> SharedArray: ) -> SharedArray:
"""Load and attach to an existing shared memory array previously """Load and attach to an existing shared memory array previously
created by another process using ``open_shared_array``. created by another process using ``open_shared_array``.
""" """
array_name, counter_name = token token = _Token.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert _known_tokens[key] == token, "WTF"
shm = shared_memory.SharedMemory(name=array_name) shm = shared_memory.SharedMemory(name=key)
shmarr = np.ndarray((size,), dtype=dtype, buffer=shm.buf) shmarr = np.ndarray(
(size,),
dtype=token.dtype_descr,
buffer=shm.buf
)
shmarr.setflags(write=int(not readonly)) shmarr.setflags(write=int(not readonly))
counter = SharedInt(token=counter_name) counter = SharedInt(token=token.shm_counter_name)
# make sure we can read # make sure we can read
counter.value counter.value
@ -214,28 +295,58 @@ def attach_shared_array(
shm, shm,
readonly=readonly, readonly=readonly,
) )
# read test
sha.array sha.array
try:
yield sha # Stash key -> token knowledge for future queries
finally: # via `maybe_opepn_shm_array()` but only after we know
pass # we can attach.
sha.close() if key not in _known_tokens:
_known_tokens[key] = token
# "close" attached shm on process teardown
actor = tractor.current_actor()
actor._lifetime_stack.callback(sha.close)
return sha
@contextmanager def maybe_open_shm_array(
def maybe_open_shared_array( key: str,
name: str, dtype: np.dtype = base_ohlc_dtype,
**kwargs, **kwargs,
) -> SharedArray: ) -> Tuple[SharedArray, bool]:
"""Attempt to attach to a shared memory block by a
"key" determined by the users overall "system"
(presumes you don't have the block's explicit token).
This function is meant to solve the problem of
discovering whether a shared array token has been
allocated or discovered by the actor running in
**this** process. Systems where multiple actors
may seek to access a common block can use this
function to attempt to acquire a token as discovered
by the actors who have previously stored a
"key" -> ``_Token`` map in an actor local variable.
If you know the explicit ``_Token`` for your memory
instead use ``attach_shm_array``.
"""
try: try:
with open_shared_array( # see if we already know this key
name=name, token = _known_tokens[key]
**kwargs, return attach_shm_array(token=token, **kwargs), False
) as shm: except KeyError:
yield shm log.debug(f"Could not find {key} in shms cache")
except FileExistsError: if dtype:
with attach_shared_array( token = _make_token(key, dtype)
token=(name, name + '_counter'), try:
**kwargs, return attach_shm_array(token=token, **kwargs), False
) as shm: except FileNotFoundError:
yield shm log.debug(f"Could not attach to shm with token {token}")
# This actor does not know about memory
# associated with the provided "key".
# Attempt to open a block and expect
# to fail if a block has been allocated
# on the OS by someone else.
return open_shm_array(key=key, dtype=dtype, **kwargs), True