Py3.9+ type updates

marketstore
Tyler Goodlet 2022-02-17 16:31:37 -05:00
parent 973f0e6180
commit 218449a9ef
1 changed files with 27 additions and 23 deletions

View File

@ -24,14 +24,14 @@
- todo: docker container management automation
'''
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple, Optional
from typing import Any, Optional
import time
from math import isnan
# from math import isnan
import msgpack
# import msgpack
import numpy as np
import pandas as pd
import tractor
# import tractor
from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient
@ -41,7 +41,7 @@ from ..data import open_feed
log = get_logger(__name__)
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_quote_dt = [
@ -56,16 +56,18 @@ _quote_dt = [
]
def mk_tbk(keys: Tuple[str, str, str]) -> str:
"""Generate a marketstore table key from a tuple.
def mk_tbk(keys: tuple[str, str, str]) -> str:
'''
Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
'''
return '{}/' + '/'.join(keys)
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
quote: dict[str, Any],
last_fill: Optional[float]
) -> np.array:
'''
@ -83,7 +85,7 @@ def quote_to_marketstore_structarray(
secs, ns = now / 10**9, now % 10**9
# pack into List[Tuple[str, Any]]
# pack into list[tuple[str, Any]]
array_input = []
# insert 'Epoch' entry first and then 'Nanoseconds'.
@ -123,17 +125,19 @@ async def get_client(
async def ingest_quote_stream(
symbols: List[str],
symbols: list[str],
brokername: str,
tries: int = 1,
actorloglevel: str = None,
loglevel: str = None,
) -> None:
'''
Ingest a broker quote stream into marketstore.
Ingest a broker quote stream into a ``marketstore`` tsdb.
'''
async with (
open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
get_client() as ms_client
open_feed(brokername, symbols, loglevel=loglevel) as feed,
get_client() as ms_client,
):
async for quotes in feed.stream:
log.info(quotes)
@ -152,30 +156,30 @@ async def ingest_quote_stream(
'Size': tick.get('size')
}, last_fill=quote.get('broker_ts', None))
await ms_client.write(
array, _tick_tbk)
await ms_client.write(array, _tick_tbk)
async def stream_quotes(
symbols: List[str],
symbols: list[str],
timeframe: str = '1Min',
attr_group: str = 'TICK',
host: str = 'localhost',
port: int = 5993,
loglevel: str = None
) -> None:
'''
Open a symbol stream from a running instance of marketstore and
log to console.
'''
tbks: Dict[str, str] = {
tbks: dict[str, str] = {
sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
# async def stream_quotes(
# symbols: List[str],
# symbols: list[str],
# host: str = 'localhost',
# port: int = 5993,
# diff_cached: bool = True,
@ -187,7 +191,7 @@ async def stream_quotes(
# # XXX: required to propagate ``tractor`` loglevel to piker logging
# get_console_log(loglevel or tractor.current_actor().loglevel)
#
# tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
# tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
#
# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# # send subs topics to server
@ -196,7 +200,7 @@ async def stream_quotes(
# )
# log.info(resp)
#
# async def recv() -> Dict[str, Any]:
# async def recv() -> dict[str, Any]:
# return msgpack.loads((await ws.get_message()), encoding='utf-8')
#
# streams = (await recv())['streams']