Port data apis to not touch primary index
							parent
							
								
									2bcebe779a
								
							
						
					
					
						commit
						ffb405c74b
					
				| 
						 | 
					@ -42,7 +42,7 @@ from ._sharedmem import (
 | 
				
			||||||
    ShmArray,
 | 
					    ShmArray,
 | 
				
			||||||
    get_shm_token,
 | 
					    get_shm_token,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from ._source import base_ohlc_dtype
 | 
					from ._source import base_iohlc_dtype
 | 
				
			||||||
from ._buffer import (
 | 
					from ._buffer import (
 | 
				
			||||||
    increment_ohlc_buffer,
 | 
					    increment_ohlc_buffer,
 | 
				
			||||||
    subscribe_ohlc_for_increment
 | 
					    subscribe_ohlc_for_increment
 | 
				
			||||||
| 
						 | 
					@ -139,6 +139,7 @@ class Feed:
 | 
				
			||||||
    name: str
 | 
					    name: str
 | 
				
			||||||
    stream: AsyncIterator[Dict[str, Any]]
 | 
					    stream: AsyncIterator[Dict[str, Any]]
 | 
				
			||||||
    shm: ShmArray
 | 
					    shm: ShmArray
 | 
				
			||||||
 | 
					    # ticks: ShmArray
 | 
				
			||||||
    _broker_portal: tractor._portal.Portal
 | 
					    _broker_portal: tractor._portal.Portal
 | 
				
			||||||
    _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
 | 
					    _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -188,7 +189,7 @@ async def open_feed(
 | 
				
			||||||
        key=sym_to_shm_key(name, symbols[0]),
 | 
					        key=sym_to_shm_key(name, symbols[0]),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # use any broker defined ohlc dtype:
 | 
					        # use any broker defined ohlc dtype:
 | 
				
			||||||
        dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
 | 
					        dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # we expect the sub-actor to write
 | 
					        # we expect the sub-actor to write
 | 
				
			||||||
        readonly=True,
 | 
					        readonly=True,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # append new entry to buffer thus "incrementing" the bar
 | 
					                # append new entry to buffer thus "incrementing" the bar
 | 
				
			||||||
                array = shm.array
 | 
					                array = shm.array
 | 
				
			||||||
                last = array[-1:].copy()
 | 
					                last = array[-1:][shm._write_fields].copy()
 | 
				
			||||||
                (index, t, close) = last[0][['index', 'time', 'close']]
 | 
					                # (index, t, close) = last[0][['index', 'time', 'close']]
 | 
				
			||||||
 | 
					                (t, close) = last[0][['time', 'close']]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # this copies non-std fields (eg. vwap) from the last datum
 | 
					                # this copies non-std fields (eg. vwap) from the last datum
 | 
				
			||||||
                last[
 | 
					                last[
 | 
				
			||||||
                    ['index', 'time', 'volume', 'open', 'high', 'low', 'close']
 | 
					                    ['time', 'volume', 'open', 'high', 'low', 'close']
 | 
				
			||||||
                ][0] = (index + 1, t + delay_s, 0, close, close, close, close)
 | 
					                ][0] = (t + delay_s, 0, close, close, close, close)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # write to the buffer
 | 
					                # write to the buffer
 | 
				
			||||||
                shm.push(last)
 | 
					                shm.push(last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # broadcast the buffer index step
 | 
					        # broadcast the buffer index step
 | 
				
			||||||
        yield {'index': shm._i.value}
 | 
					        yield {'index': shm._last.value}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def subscribe_ohlc_for_increment(
 | 
					def subscribe_ohlc_for_increment(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -33,6 +33,6 @@ def iterticks(
 | 
				
			||||||
    ticks = quote.get('ticks', ())
 | 
					    ticks = quote.get('ticks', ())
 | 
				
			||||||
    if ticks:
 | 
					    if ticks:
 | 
				
			||||||
        for tick in ticks:
 | 
					        for tick in ticks:
 | 
				
			||||||
            print(f"{quote['symbol']}: {tick}")
 | 
					            # print(f"{quote['symbol']}: {tick}")
 | 
				
			||||||
            if tick.get('type') in types:
 | 
					            if tick.get('type') in types:
 | 
				
			||||||
                yield tick
 | 
					                yield tick
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -15,27 +15,36 @@
 | 
				
			||||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
					# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
Numpy data source machinery.
 | 
					numpy data source coversion helpers.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import decimal
 | 
					import decimal
 | 
				
			||||||
from dataclasses import dataclass
 | 
					from dataclasses import dataclass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
import pandas as pd
 | 
					import pandas as pd
 | 
				
			||||||
 | 
					# from numba import from_dtype
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					ohlc_fields = [
 | 
				
			||||||
 | 
					    ('time', float),
 | 
				
			||||||
 | 
					    ('open', float),
 | 
				
			||||||
 | 
					    ('high', float),
 | 
				
			||||||
 | 
					    ('low', float),
 | 
				
			||||||
 | 
					    ('close', float),
 | 
				
			||||||
 | 
					    ('volume', int),
 | 
				
			||||||
 | 
					    ('bar_wap', float),
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					ohlc_with_index = ohlc_fields.copy()
 | 
				
			||||||
 | 
					ohlc_with_index.insert(0, ('index', int))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# our minimum structured array layout for ohlc data
 | 
					# our minimum structured array layout for ohlc data
 | 
				
			||||||
base_ohlc_dtype = np.dtype(
 | 
					base_iohlc_dtype = np.dtype(ohlc_with_index)
 | 
				
			||||||
    [
 | 
					base_ohlc_dtype = np.dtype(ohlc_fields)
 | 
				
			||||||
        ('index', int),
 | 
					
 | 
				
			||||||
        ('time', float),
 | 
					# TODO: for now need to construct this manually for readonly arrays, see
 | 
				
			||||||
        ('open', float),
 | 
					# https://github.com/numba/numba/issues/4511
 | 
				
			||||||
        ('high', float),
 | 
					# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
 | 
				
			||||||
        ('low', float),
 | 
					 | 
				
			||||||
        ('close', float),
 | 
					 | 
				
			||||||
        ('volume', int),
 | 
					 | 
				
			||||||
    ]
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
# map time frame "keys" to minutes values
 | 
					# map time frame "keys" to minutes values
 | 
				
			||||||
tf_in_1m = {
 | 
					tf_in_1m = {
 | 
				
			||||||
| 
						 | 
					@ -110,18 +119,27 @@ def from_df(
 | 
				
			||||||
        'Low': 'low',
 | 
					        'Low': 'low',
 | 
				
			||||||
        'Close': 'close',
 | 
					        'Close': 'close',
 | 
				
			||||||
        'Volume': 'volume',
 | 
					        'Volume': 'volume',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # most feeds are providing this over sesssion anchored
 | 
				
			||||||
 | 
					        'vwap': 'bar_wap',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # XXX: ib_insync calls this the "wap of the bar"
 | 
				
			||||||
 | 
					        # but no clue what is actually is...
 | 
				
			||||||
 | 
					        # https://github.com/pikers/piker/issues/119#issuecomment-729120988
 | 
				
			||||||
 | 
					        'average': 'bar_wap',
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    df = df.rename(columns=columns)
 | 
					    df = df.rename(columns=columns)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for name in df.columns:
 | 
					    for name in df.columns:
 | 
				
			||||||
        if name not in base_ohlc_dtype.names[1:]:
 | 
					        # if name not in base_ohlc_dtype.names[1:]:
 | 
				
			||||||
 | 
					        if name not in base_ohlc_dtype.names:
 | 
				
			||||||
            del df[name]
 | 
					            del df[name]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: it turns out column access on recarrays is actually slower:
 | 
					    # TODO: it turns out column access on recarrays is actually slower:
 | 
				
			||||||
    # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
 | 
					    # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
 | 
				
			||||||
    # it might make sense to make these structured arrays?
 | 
					    # it might make sense to make these structured arrays?
 | 
				
			||||||
    array = df.to_records()
 | 
					    array = df.to_records(index=False)
 | 
				
			||||||
    _nan_to_closest_num(array)
 | 
					    _nan_to_closest_num(array)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return array
 | 
					    return array
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue