diff --git a/piker/data/__init__.py b/piker/data/__init__.py index e46db9b7..153c1c8f 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -26,6 +26,7 @@ from ._sharedmem import ( ShmArray, get_shm_token, ) +from ._source import base_ohlc_dtype from ._buffer import ( increment_ohlc_buffer, subscribe_ohlc_for_increment @@ -163,10 +164,11 @@ async def open_feed( if loglevel is None: loglevel = tractor.current_actor().loglevel - # attempt to allocate (or attach to) shm array for this - # broker/symbol + # Attempt to allocate (or attach to) shm array for this broker/symbol shm, opened = maybe_open_shm_array( key=sym_to_shm_key(name, symbols[0]), + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), # we expect the sub-actor to write readonly=True, @@ -185,7 +187,12 @@ async def open_feed( # compat with eventual ``tractor.msg.pub`` topics=symbols, ) + + # TODO: we can't do this **and** be compate with + # ``tractor.msg.pub``, should we maybe just drop this after + # tests are in? shm_token, is_writer = await stream.receive() + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) assert shm_token == shm.token # sanity diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 6c410423..96526206 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -12,8 +12,8 @@ from _posixshmem import shm_unlink import tractor import numpy as np - from ..log import get_logger +from ._source import base_ohlc_dtype log = get_logger(__name__) @@ -41,19 +41,6 @@ mantracker.unregister = mantracker._resource_tracker.unregister mantracker.getfd = mantracker._resource_tracker.getfd -base_ohlc_dtype = np.dtype( - [ - ('index', int), - ('time', float), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', int), - ] -) - - class SharedInt: def __init__( self, @@ -122,15 +109,16 @@ def get_shm_token(key: str) -> _Token: def _make_token( key: str, - dtype: np.dtype = base_ohlc_dtype, + dtype: Optional[np.dtype] = None, ) -> _Token: """Create a serializable token that can be used to access a shared array. """ + dtype = base_ohlc_dtype if dtype is None else dtype return _Token( key, key + "_counter", - dtype.descr + np.dtype(dtype).descr ) @@ -214,7 +202,7 @@ def open_shm_array( key: Optional[str] = None, # approx number of 5s bars in a "day" x2 size: int = int(2*60*60*10/5), - dtype: np.dtype = base_ohlc_dtype, + dtype: Optional[np.dtype] = None, readonly: bool = False, ) -> ShmArray: """Open a memory shared ``numpy`` using the standard library. @@ -266,7 +254,6 @@ def open_shm_array( def attach_shm_array( token: Tuple[str, str, Tuple[str, str]], size: int = int(60*60*10/5), - # dtype: np.dtype = base_ohlc_dtype, readonly: bool = True, ) -> ShmArray: """Load and attach to an existing shared memory array previously @@ -312,7 +299,7 @@ def attach_shm_array( def maybe_open_shm_array( key: str, - dtype: np.dtype = base_ohlc_dtype, + dtype: Optional[np.dtype] = None, **kwargs, ) -> Tuple[ShmArray, bool]: """Attempt to attach to a shared memory block by a