diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 3f75c994..c9fc5d16 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -24,14 +24,14 @@ - todo: docker container management automation ''' from contextlib import asynccontextmanager -from typing import Dict, Any, List, Callable, Tuple, Optional +from typing import Any, Optional import time -from math import isnan +# from math import isnan -import msgpack +# import msgpack import numpy as np import pandas as pd -import tractor +# import tractor from trio_websocket import open_websocket_url from anyio_marketstore import open_marketstore_client, MarketstoreClient @@ -41,7 +41,7 @@ from ..data import open_feed log = get_logger(__name__) -_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK') +_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) _quote_dt = [ @@ -56,16 +56,18 @@ _quote_dt = [ ] -def mk_tbk(keys: Tuple[str, str, str]) -> str: - """Generate a marketstore table key from a tuple. +def mk_tbk(keys: tuple[str, str, str]) -> str: + ''' + Generate a marketstore table key from a tuple. Converts, ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` - """ + + ''' return '{}/' + '/'.join(keys) def quote_to_marketstore_structarray( - quote: Dict[str, Any], + quote: dict[str, Any], last_fill: Optional[float] ) -> np.array: ''' @@ -83,7 +85,7 @@ def quote_to_marketstore_structarray( secs, ns = now / 10**9, now % 10**9 - # pack into List[Tuple[str, Any]] + # pack into list[tuple[str, Any]] array_input = [] # insert 'Epoch' entry first and then 'Nanoseconds'. @@ -123,17 +125,19 @@ async def get_client( async def ingest_quote_stream( - symbols: List[str], + symbols: list[str], brokername: str, tries: int = 1, - actorloglevel: str = None, + loglevel: str = None, + ) -> None: ''' - Ingest a broker quote stream into marketstore. + Ingest a broker quote stream into a ``marketstore`` tsdb. + ''' async with ( - open_feed(brokername, symbols, loglevel=actorloglevel) as feed, - get_client() as ms_client + open_feed(brokername, symbols, loglevel=loglevel) as feed, + get_client() as ms_client, ): async for quotes in feed.stream: log.info(quotes) @@ -152,30 +156,30 @@ async def ingest_quote_stream( 'Size': tick.get('size') }, last_fill=quote.get('broker_ts', None)) - await ms_client.write( - array, _tick_tbk) - + await ms_client.write(array, _tick_tbk) + async def stream_quotes( - symbols: List[str], + symbols: list[str], timeframe: str = '1Min', attr_group: str = 'TICK', host: str = 'localhost', port: int = 5993, loglevel: str = None + ) -> None: ''' Open a symbol stream from a running instance of marketstore and log to console. ''' - tbks: Dict[str, str] = { + tbks: dict[str, str] = { sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols} # async def stream_quotes( -# symbols: List[str], +# symbols: list[str], # host: str = 'localhost', # port: int = 5993, # diff_cached: bool = True, @@ -187,7 +191,7 @@ async def stream_quotes( # # XXX: required to propagate ``tractor`` loglevel to piker logging # get_console_log(loglevel or tractor.current_actor().loglevel) # -# tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} +# tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} # # async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: # # send subs topics to server @@ -196,7 +200,7 @@ async def stream_quotes( # ) # log.info(resp) # -# async def recv() -> Dict[str, Any]: +# async def recv() -> dict[str, Any]: # return msgpack.loads((await ws.get_message()), encoding='utf-8') # # streams = (await recv())['streams']