Port data apis to not touch primary index

vwap_backup
Tyler Goodlet 2020-12-12 17:14:03 -05:00
parent 6bae50ba2e
commit 098db15b2d
4 changed files with 42 additions and 22 deletions

View File

@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray, ShmArray,
get_shm_token, get_shm_token,
) )
from ._source import base_ohlc_dtype from ._source import base_iohlc_dtype
from ._buffer import ( from ._buffer import (
increment_ohlc_buffer, increment_ohlc_buffer,
subscribe_ohlc_for_increment subscribe_ohlc_for_increment
@ -139,6 +139,7 @@ class Feed:
name: str name: str
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal _broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]), key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=True, readonly=True,

View File

@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar # append new entry to buffer thus "incrementing" the bar
array = shm.array array = shm.array
last = array[-1:].copy() last = array[-1:][shm._write_fields].copy()
(index, t, close) = last[0][['index', 'time', 'close']] # (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum # this copies non-std fields (eg. vwap) from the last datum
last[ last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close'] ['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close) ][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer # write to the buffer
shm.push(last) shm.push(last)
# broadcast the buffer index step # broadcast the buffer index step
yield {'index': shm._i.value} yield {'index': shm._last.value}
def subscribe_ohlc_for_increment( def subscribe_ohlc_for_increment(

View File

@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ()) ticks = quote.get('ticks', ())
if ticks: if ticks:
for tick in ticks: for tick in ticks:
print(f"{quote['symbol']}: {tick}") # print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types: if tick.get('type') in types:
yield tick yield tick

View File

@ -15,27 +15,36 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Numpy data source machinery. numpy data source coversion helpers.
""" """
import decimal import decimal
from dataclasses import dataclass from dataclasses import dataclass
import numpy as np import numpy as np
import pandas as pd import pandas as pd
# from numba import from_dtype
# our minimum structured array layout for ohlc data ohlc_fields = [
base_ohlc_dtype = np.dtype(
[
('index', int),
('time', float), ('time', float),
('open', float), ('open', float),
('high', float), ('high', float),
('low', float), ('low', float),
('close', float), ('close', float),
('volume', int), ('volume', int),
('bar_wap', float),
] ]
)
ohlc_with_index = ohlc_fields.copy()
ohlc_with_index.insert(0, ('index', int))
# our minimum structured array layout for ohlc data
base_iohlc_dtype = np.dtype(ohlc_with_index)
base_ohlc_dtype = np.dtype(ohlc_fields)
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values # map time frame "keys" to minutes values
tf_in_1m = { tf_in_1m = {
@ -110,18 +119,27 @@ def from_df(
'Low': 'low', 'Low': 'low',
'Close': 'close', 'Close': 'close',
'Volume': 'volume', 'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
} }
df = df.rename(columns=columns) df = df.rename(columns=columns)
for name in df.columns: for name in df.columns:
if name not in base_ohlc_dtype.names[1:]: # if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name] del df[name]
# TODO: it turns out column access on recarrays is actually slower: # TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays? # it might make sense to make these structured arrays?
array = df.to_records() array = df.to_records(index=False)
_nan_to_closest_num(array) _nan_to_closest_num(array)
return array return array