Port data apis to not touch primary index

to_qpainterpath_and_beyond
Tyler Goodlet 2020-12-12 17:14:03 -05:00
parent fda9fcbc55
commit 599b5276b4
4 changed files with 42 additions and 22 deletions

View File

@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray,
get_shm_token,
)
from ._source import base_ohlc_dtype
from ._source import base_iohlc_dtype
from ._buffer import (
increment_ohlc_buffer,
subscribe_ohlc_for_increment
@ -139,6 +139,7 @@ class Feed:
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=True,

View File

@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:].copy()
(index, t, close) = last[0][['index', 'time', 'close']]
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close)
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
yield {'index': shm._i.value}
yield {'index': shm._last.value}
def subscribe_ohlc_for_increment(

View File

@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
print(f"{quote['symbol']}: {tick}")
# print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
yield tick

View File

@ -15,27 +15,36 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Numpy data source machinery.
numpy data source coversion helpers.
"""
import decimal
from dataclasses import dataclass
import numpy as np
import pandas as pd
# from numba import from_dtype
ohlc_fields = [
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
('bar_wap', float),
]
ohlc_with_index = ohlc_fields.copy()
ohlc_with_index.insert(0, ('index', int))
# our minimum structured array layout for ohlc data
base_ohlc_dtype = np.dtype(
[
('index', int),
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
]
)
base_iohlc_dtype = np.dtype(ohlc_with_index)
base_ohlc_dtype = np.dtype(ohlc_fields)
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values
tf_in_1m = {
@ -110,18 +119,27 @@ def from_df(
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
if name not in base_ohlc_dtype.names[1:]:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records()
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array