Py3.9+ type updates
parent
facc86f76e
commit
ed5bae0e11
|
@ -24,14 +24,14 @@
|
||||||
- todo: docker container management automation
|
- todo: docker container management automation
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Dict, Any, List, Callable, Tuple, Optional
|
from typing import Any, Optional
|
||||||
import time
|
import time
|
||||||
from math import isnan
|
# from math import isnan
|
||||||
|
|
||||||
import msgpack
|
# import msgpack
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import tractor
|
# import tractor
|
||||||
from trio_websocket import open_websocket_url
|
from trio_websocket import open_websocket_url
|
||||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ from ..data import open_feed
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
|
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
|
||||||
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
|
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
|
||||||
|
|
||||||
_quote_dt = [
|
_quote_dt = [
|
||||||
|
@ -56,16 +56,18 @@ _quote_dt = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def mk_tbk(keys: Tuple[str, str, str]) -> str:
|
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
"""Generate a marketstore table key from a tuple.
|
'''
|
||||||
|
Generate a marketstore table key from a tuple.
|
||||||
Converts,
|
Converts,
|
||||||
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
||||||
"""
|
|
||||||
|
'''
|
||||||
return '{}/' + '/'.join(keys)
|
return '{}/' + '/'.join(keys)
|
||||||
|
|
||||||
|
|
||||||
def quote_to_marketstore_structarray(
|
def quote_to_marketstore_structarray(
|
||||||
quote: Dict[str, Any],
|
quote: dict[str, Any],
|
||||||
last_fill: Optional[float]
|
last_fill: Optional[float]
|
||||||
) -> np.array:
|
) -> np.array:
|
||||||
'''
|
'''
|
||||||
|
@ -83,7 +85,7 @@ def quote_to_marketstore_structarray(
|
||||||
|
|
||||||
secs, ns = now / 10**9, now % 10**9
|
secs, ns = now / 10**9, now % 10**9
|
||||||
|
|
||||||
# pack into List[Tuple[str, Any]]
|
# pack into list[tuple[str, Any]]
|
||||||
array_input = []
|
array_input = []
|
||||||
|
|
||||||
# insert 'Epoch' entry first and then 'Nanoseconds'.
|
# insert 'Epoch' entry first and then 'Nanoseconds'.
|
||||||
|
@ -123,17 +125,19 @@ async def get_client(
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
symbols: List[str],
|
symbols: list[str],
|
||||||
brokername: str,
|
brokername: str,
|
||||||
tries: int = 1,
|
tries: int = 1,
|
||||||
actorloglevel: str = None,
|
loglevel: str = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Ingest a broker quote stream into marketstore.
|
Ingest a broker quote stream into a ``marketstore`` tsdb.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with (
|
async with (
|
||||||
open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
|
open_feed(brokername, symbols, loglevel=loglevel) as feed,
|
||||||
get_client() as ms_client
|
get_client() as ms_client,
|
||||||
):
|
):
|
||||||
async for quotes in feed.stream:
|
async for quotes in feed.stream:
|
||||||
log.info(quotes)
|
log.info(quotes)
|
||||||
|
@ -152,30 +156,30 @@ async def ingest_quote_stream(
|
||||||
'Size': tick.get('size')
|
'Size': tick.get('size')
|
||||||
}, last_fill=quote.get('broker_ts', None))
|
}, last_fill=quote.get('broker_ts', None))
|
||||||
|
|
||||||
await ms_client.write(
|
await ms_client.write(array, _tick_tbk)
|
||||||
array, _tick_tbk)
|
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
symbols: List[str],
|
symbols: list[str],
|
||||||
timeframe: str = '1Min',
|
timeframe: str = '1Min',
|
||||||
attr_group: str = 'TICK',
|
attr_group: str = 'TICK',
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
port: int = 5993,
|
port: int = 5993,
|
||||||
loglevel: str = None
|
loglevel: str = None
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Open a symbol stream from a running instance of marketstore and
|
Open a symbol stream from a running instance of marketstore and
|
||||||
log to console.
|
log to console.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
tbks: Dict[str, str] = {
|
tbks: dict[str, str] = {
|
||||||
sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
|
sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# async def stream_quotes(
|
# async def stream_quotes(
|
||||||
# symbols: List[str],
|
# symbols: list[str],
|
||||||
# host: str = 'localhost',
|
# host: str = 'localhost',
|
||||||
# port: int = 5993,
|
# port: int = 5993,
|
||||||
# diff_cached: bool = True,
|
# diff_cached: bool = True,
|
||||||
|
@ -187,7 +191,7 @@ async def stream_quotes(
|
||||||
# # XXX: required to propagate ``tractor`` loglevel to piker logging
|
# # XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||||
# get_console_log(loglevel or tractor.current_actor().loglevel)
|
# get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||||
#
|
#
|
||||||
# tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
|
# tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
|
||||||
#
|
#
|
||||||
# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
||||||
# # send subs topics to server
|
# # send subs topics to server
|
||||||
|
@ -196,7 +200,7 @@ async def stream_quotes(
|
||||||
# )
|
# )
|
||||||
# log.info(resp)
|
# log.info(resp)
|
||||||
#
|
#
|
||||||
# async def recv() -> Dict[str, Any]:
|
# async def recv() -> dict[str, Any]:
|
||||||
# return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
# return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
||||||
#
|
#
|
||||||
# streams = (await recv())['streams']
|
# streams = (await recv())['streams']
|
||||||
|
|
Loading…
Reference in New Issue