Add WIP backfiller from data feed helper

marketstore_backup
Tyler Goodlet 2022-02-18 12:17:12 -05:00
parent 6d38f3d0cb
commit a6613161c3
1 changed files with 36 additions and 3 deletions

View File

@ -36,7 +36,7 @@ from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient from anyio_marketstore import open_marketstore_client, MarketstoreClient
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import open_feed from ..data.feed import maybe_open_feed
log = get_logger(__name__) log = get_logger(__name__)
@ -71,6 +71,7 @@ _quote_dt = [
# ('brokerd_ts', 'i64'), # ('brokerd_ts', 'i64'),
# ('VWAP', 'f4') # ('VWAP', 'f4')
] ]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = { _tick_map = {
'Up': 1, 'Up': 1,
@ -79,6 +80,19 @@ _tick_map = {
None: np.nan, None: np.nan,
} }
_ohlcv_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
# ohlcv sampling
('Open', 'f4'),
('High', 'f4'),
('Low', 'i8'),
('Close', 'i8'),
('Volume', 'f4'),
]
def mk_tbk(keys: tuple[str, str, str]) -> str: def mk_tbk(keys: tuple[str, str, str]) -> str:
''' '''
@ -143,13 +157,32 @@ def timestamp(date, **kwargs) -> int:
@asynccontextmanager @asynccontextmanager
async def get_client( async def get_client(
host: str = 'localhost', host: str = 'localhost',
port: int = 5995 port: int = 5993
) -> MarketstoreClient: ) -> MarketstoreClient:
async with open_marketstore_client(host, port) as client: async with open_marketstore_client(host, port) as client:
yield client yield client
async def backfill_history():
async with (
get_client() as msclient,
maybe_open_feed(
'ib',
['mnq.globex'],
loglevel='info',
# backpressure=False,
start_stream=False,
) as (feed, stream),
):
await tractor.breakpoint()
await msclient.write(
feed.shm.array,
tbk='mnq.globex.ib/1Sec/OHLCV',
)
async def ingest_quote_stream( async def ingest_quote_stream(
symbols: list[str], symbols: list[str],
brokername: str, brokername: str,
@ -162,7 +195,7 @@ async def ingest_quote_stream(
''' '''
async with ( async with (
open_feed(brokername, symbols, loglevel=loglevel) as feed, maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed,
get_client() as ms_client, get_client() as ms_client,
): ):
async for quotes in feed.stream: async for quotes in feed.stream: