Always ask backend for ohlc dtype

bar_select
Tyler Goodlet 2020-09-26 14:11:55 -04:00
parent 47d4ec5985
commit 8a4528c006
2 changed files with 15 additions and 21 deletions

View File

@ -26,6 +26,7 @@ from ._sharedmem import (
ShmArray,
get_shm_token,
)
from ._source import base_ohlc_dtype
from ._buffer import (
increment_ohlc_buffer,
subscribe_ohlc_for_increment
@ -163,10 +164,11 @@ async def open_feed(
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# attempt to allocate (or attach to) shm array for this
# broker/symbol
# Attempt to allocate (or attach to) shm array for this broker/symbol
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
# we expect the sub-actor to write
readonly=True,
@ -185,7 +187,12 @@ async def open_feed(
# compat with eventual ``tractor.msg.pub``
topics=symbols,
)
# TODO: we can't do this **and** be compate with
# ``tractor.msg.pub``, should we maybe just drop this after
# tests are in?
shm_token, is_writer = await stream.receive()
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity

View File

@ -12,8 +12,8 @@ from _posixshmem import shm_unlink
import tractor
import numpy as np
from ..log import get_logger
from ._source import base_ohlc_dtype
log = get_logger(__name__)
@ -41,19 +41,6 @@ mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
base_ohlc_dtype = np.dtype(
[
('index', int),
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
]
)
class SharedInt:
def __init__(
self,
@ -122,15 +109,16 @@ def get_shm_token(key: str) -> _Token:
def _make_token(
key: str,
dtype: np.dtype = base_ohlc_dtype,
dtype: Optional[np.dtype] = None,
) -> _Token:
"""Create a serializable token that can be used
to access a shared array.
"""
dtype = base_ohlc_dtype if dtype is None else dtype
return _Token(
key,
key + "_counter",
dtype.descr
np.dtype(dtype).descr
)
@ -214,7 +202,7 @@ def open_shm_array(
key: Optional[str] = None,
# approx number of 5s bars in a "day" x2
size: int = int(2*60*60*10/5),
dtype: np.dtype = base_ohlc_dtype,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
"""Open a memory shared ``numpy`` using the standard library.
@ -266,7 +254,6 @@ def open_shm_array(
def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]],
size: int = int(60*60*10/5),
# dtype: np.dtype = base_ohlc_dtype,
readonly: bool = True,
) -> ShmArray:
"""Load and attach to an existing shared memory array previously
@ -312,7 +299,7 @@ def attach_shm_array(
def maybe_open_shm_array(
key: str,
dtype: np.dtype = base_ohlc_dtype,
dtype: Optional[np.dtype] = None,
**kwargs,
) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block by a