Add WIP backfiller from data feed helper
parent
4bcc301c01
commit
56fa759452
|
@ -36,7 +36,7 @@ from trio_websocket import open_websocket_url
|
||||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
||||||
|
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from ..data import open_feed
|
from ..data.feed import maybe_open_feed
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -71,6 +71,7 @@ _quote_dt = [
|
||||||
# ('brokerd_ts', 'i64'),
|
# ('brokerd_ts', 'i64'),
|
||||||
# ('VWAP', 'f4')
|
# ('VWAP', 'f4')
|
||||||
]
|
]
|
||||||
|
|
||||||
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
||||||
_tick_map = {
|
_tick_map = {
|
||||||
'Up': 1,
|
'Up': 1,
|
||||||
|
@ -79,6 +80,19 @@ _tick_map = {
|
||||||
None: np.nan,
|
None: np.nan,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ohlcv_dt = [
|
||||||
|
# these two are required for as a "primary key"
|
||||||
|
('Epoch', 'i8'),
|
||||||
|
('Nanoseconds', 'i4'),
|
||||||
|
|
||||||
|
# ohlcv sampling
|
||||||
|
('Open', 'f4'),
|
||||||
|
('High', 'f4'),
|
||||||
|
('Low', 'i8'),
|
||||||
|
('Close', 'i8'),
|
||||||
|
('Volume', 'f4'),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
'''
|
'''
|
||||||
|
@ -143,13 +157,32 @@ def timestamp(date, **kwargs) -> int:
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get_client(
|
async def get_client(
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
port: int = 5995
|
port: int = 5993
|
||||||
|
|
||||||
) -> MarketstoreClient:
|
) -> MarketstoreClient:
|
||||||
async with open_marketstore_client(host, port) as client:
|
async with open_marketstore_client(host, port) as client:
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
|
async def backfill_history():
|
||||||
|
|
||||||
|
async with (
|
||||||
|
get_client() as msclient,
|
||||||
|
maybe_open_feed(
|
||||||
|
'ib',
|
||||||
|
['mnq.globex'],
|
||||||
|
loglevel='info',
|
||||||
|
# backpressure=False,
|
||||||
|
start_stream=False,
|
||||||
|
) as (feed, stream),
|
||||||
|
):
|
||||||
|
await tractor.breakpoint()
|
||||||
|
await msclient.write(
|
||||||
|
feed.shm.array,
|
||||||
|
tbk='mnq.globex.ib/1Sec/OHLCV',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
@ -162,7 +195,7 @@ async def ingest_quote_stream(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with (
|
async with (
|
||||||
open_feed(brokername, symbols, loglevel=loglevel) as feed,
|
maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed,
|
||||||
get_client() as ms_client,
|
get_client() as ms_client,
|
||||||
):
|
):
|
||||||
async for quotes in feed.stream:
|
async for quotes in feed.stream:
|
||||||
|
|
Loading…
Reference in New Issue