Prototype out writing `1Sec` OHLCV data
							parent
							
								
									0af809849b
								
							
						
					
					
						commit
						852f9e3e64
					
				| 
						 | 
				
			
			@ -23,7 +23,8 @@
 | 
			
		|||
- todo: tick sequence stream-cloning for testing
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Optional,
 | 
			
		||||
| 
						 | 
				
			
			@ -33,12 +34,13 @@ from typing import (
 | 
			
		|||
import time
 | 
			
		||||
from math import isnan
 | 
			
		||||
 | 
			
		||||
from bidict import bidict
 | 
			
		||||
import msgpack
 | 
			
		||||
import numpy as np
 | 
			
		||||
import pandas as pd
 | 
			
		||||
import tractor
 | 
			
		||||
from trio_websocket import open_websocket_url
 | 
			
		||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
			
		||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
 | 
			
		||||
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from .feed import maybe_open_feed
 | 
			
		||||
| 
						 | 
				
			
			@ -164,12 +166,17 @@ def timestamp(date, **kwargs) -> int:
 | 
			
		|||
    return int(pd.Timestamp(date, **kwargs).value)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
@acm
 | 
			
		||||
async def get_client(
 | 
			
		||||
    host: str = 'localhost',
 | 
			
		||||
    port: int = 5995
 | 
			
		||||
 | 
			
		||||
) -> MarketstoreClient:
 | 
			
		||||
    '''
 | 
			
		||||
    Load a ``anyio_marketstore`` grpc client connected
 | 
			
		||||
    to an existing ``marketstore`` server.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async with open_marketstore_client(
 | 
			
		||||
        host,
 | 
			
		||||
        port
 | 
			
		||||
| 
						 | 
				
			
			@ -192,6 +199,41 @@ async def get_client(
 | 
			
		|||
#                 raise MarketStoreError(err)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
tf_in_1s = bidict({
 | 
			
		||||
    1: '1Sec',
 | 
			
		||||
    60: '1Min',
 | 
			
		||||
    60*5: '5Min',
 | 
			
		||||
    60*15: '15Min',
 | 
			
		||||
    60*30: '30Min',
 | 
			
		||||
    60*60: '1H',
 | 
			
		||||
    60*60*24: '1D',
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# @acm
 | 
			
		||||
async def load_history(
 | 
			
		||||
    symbol: Symbol,
 | 
			
		||||
    period: int = 1,  # in seconds
 | 
			
		||||
 | 
			
		||||
) -> np.ndarray:
 | 
			
		||||
 | 
			
		||||
    async with get_client() as client:
 | 
			
		||||
 | 
			
		||||
        tfstr = tf_in_1s[period]
 | 
			
		||||
        result = await client.query(
 | 
			
		||||
            Params(fqsn, tf_in_1s, 'OHLCV',)
 | 
			
		||||
        )
 | 
			
		||||
        # Dig out `numpy` results map
 | 
			
		||||
        arrays = {}
 | 
			
		||||
        await tractor.breakpoint()
 | 
			
		||||
        # for qr in [onem, fivem]:
 | 
			
		||||
        #     for name, data_set in qr.by_symbols().items():
 | 
			
		||||
        #         arrays[(name, qr)] = data_set.array
 | 
			
		||||
 | 
			
		||||
        # # TODO: backfiller loop
 | 
			
		||||
        # array = arrays[(fqsn, qr)]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def backfill_history(
 | 
			
		||||
    # symbol: Symbol
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -201,28 +243,32 @@ async def backfill_history(
 | 
			
		|||
    # - take ``Symbol`` as input
 | 
			
		||||
    # - backtrack into history using backend helper endpoint
 | 
			
		||||
 | 
			
		||||
    # broker = 'ib'
 | 
			
		||||
    # symbol = 'mnq.globex'
 | 
			
		||||
    broker = 'ib'
 | 
			
		||||
    symbol = 'mnq.globex'
 | 
			
		||||
 | 
			
		||||
    broker = 'binance'
 | 
			
		||||
    symbol = 'btcusdt'
 | 
			
		||||
    # broker = 'binance'
 | 
			
		||||
    # symbol = 'btcusdt'
 | 
			
		||||
 | 
			
		||||
    fqsn = mk_fqsn(broker, symbol)
 | 
			
		||||
 | 
			
		||||
    print('yo')
 | 
			
		||||
    async with (
 | 
			
		||||
        get_client() as msclient,
 | 
			
		||||
        get_client() as client,
 | 
			
		||||
        maybe_open_feed(
 | 
			
		||||
            broker,
 | 
			
		||||
            [symbol],
 | 
			
		||||
            loglevel='info',
 | 
			
		||||
            # backpressure=False,
 | 
			
		||||
            start_stream=False,
 | 
			
		||||
 | 
			
		||||
        ) as (feed, stream),
 | 
			
		||||
    ):
 | 
			
		||||
        print('yo')
 | 
			
		||||
        ohlcv = feed.shm.array
 | 
			
		||||
        mkts_dt = np.dtype(_ohlcv_dt)
 | 
			
		||||
 | 
			
		||||
        syms = await msclient.list_symbols()
 | 
			
		||||
        print('yo')
 | 
			
		||||
        syms = await client.list_symbols()
 | 
			
		||||
        log.info(f'Existing symbol set:\n{pformat(syms)}')
 | 
			
		||||
 | 
			
		||||
        # build mkts schema compat array
 | 
			
		||||
| 
						 | 
				
			
			@ -240,14 +286,39 @@ async def backfill_history(
 | 
			
		|||
            'volume',
 | 
			
		||||
        ]]
 | 
			
		||||
 | 
			
		||||
        key = (fqsn, '1Sec', 'OHLCV')
 | 
			
		||||
        tbk = mk_tbk(key)
 | 
			
		||||
 | 
			
		||||
        # diff vs. existing array and append new history
 | 
			
		||||
        # TODO:
 | 
			
		||||
 | 
			
		||||
        # write to db
 | 
			
		||||
        resp = await msclient.write(
 | 
			
		||||
        resp = await client.write(
 | 
			
		||||
            mkts_array,
 | 
			
		||||
            tbk=f'{fqsn}/1Min/OHLCV',
 | 
			
		||||
            isvariablelength=True,
 | 
			
		||||
            tbk=tbk,
 | 
			
		||||
            # NOTE: will will append duplicates
 | 
			
		||||
            # for the same timestamp-index.
 | 
			
		||||
            # isvariablelength=True,
 | 
			
		||||
        )
 | 
			
		||||
        # TODO: should be no error?
 | 
			
		||||
        # assert not resp.responses
 | 
			
		||||
 | 
			
		||||
        # # Dig out `numpy` results map
 | 
			
		||||
        qr = await client.query(
 | 
			
		||||
            Params(fqsn, '1Min`', 'OHLCV',)
 | 
			
		||||
        )
 | 
			
		||||
        qr = await client.query(
 | 
			
		||||
            # Params(fqsn, '1Sec`', 'OHLCV',)
 | 
			
		||||
            Params(*key),
 | 
			
		||||
        )
 | 
			
		||||
        arrays = {}
 | 
			
		||||
        # for qr in [onem, fivem]:
 | 
			
		||||
        for name, data_set in qr.by_symbols().items():
 | 
			
		||||
            arrays[(name, qr)] = data_set.array
 | 
			
		||||
 | 
			
		||||
        # TODO: backfiller loop
 | 
			
		||||
        array = arrays[(fqsn, qr)]
 | 
			
		||||
        await tractor.breakpoint()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def ingest_quote_stream(
 | 
			
		||||
| 
						 | 
				
			
			@ -315,24 +386,6 @@ async def ingest_quote_stream(
 | 
			
		|||
            #         await ms_client.write(symbol, a)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def stream_quotes(
 | 
			
		||||
#     symbols: list[str],
 | 
			
		||||
#     timeframe: str = '1Min',
 | 
			
		||||
#     attr_group: str = 'TICK',
 | 
			
		||||
#     host: str = 'localhost',
 | 
			
		||||
#     port: int = 5993,
 | 
			
		||||
#     loglevel: str = None
 | 
			
		||||
 | 
			
		||||
# ) -> None:
 | 
			
		||||
#     '''
 | 
			
		||||
#     Open a symbol stream from a running instance of marketstore and
 | 
			
		||||
#     log to console.
 | 
			
		||||
 | 
			
		||||
#     '''
 | 
			
		||||
#     tbks: dict[str, str] = {
 | 
			
		||||
#         sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_quotes(
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
    host: str = 'localhost',
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue