Py3.9+ type updates
							parent
							
								
									86337430d8
								
							
						
					
					
						commit
						9c5f7a6bb9
					
				| 
						 | 
					@ -24,14 +24,14 @@
 | 
				
			||||||
- todo: docker container management automation
 | 
					- todo: docker container management automation
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from contextlib import asynccontextmanager
 | 
					from contextlib import asynccontextmanager
 | 
				
			||||||
from typing import Dict, Any, List, Callable, Tuple, Optional
 | 
					from typing import Any, Optional
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
from math import isnan
 | 
					# from math import isnan
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import msgpack
 | 
					# import msgpack
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
import pandas as pd
 | 
					import pandas as pd
 | 
				
			||||||
import tractor
 | 
					# import tractor
 | 
				
			||||||
from trio_websocket import open_websocket_url
 | 
					from trio_websocket import open_websocket_url
 | 
				
			||||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
					from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -41,7 +41,7 @@ from ..data import open_feed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(__name__)
 | 
					log = get_logger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
 | 
					_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
 | 
				
			||||||
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
 | 
					_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_quote_dt = [
 | 
					_quote_dt = [
 | 
				
			||||||
| 
						 | 
					@ -56,16 +56,18 @@ _quote_dt = [
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def mk_tbk(keys: Tuple[str, str, str]) -> str:
 | 
					def mk_tbk(keys: tuple[str, str, str]) -> str:
 | 
				
			||||||
    """Generate a marketstore table key from a tuple.
 | 
					    '''
 | 
				
			||||||
 | 
					    Generate a marketstore table key from a tuple.
 | 
				
			||||||
    Converts,
 | 
					    Converts,
 | 
				
			||||||
        ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
 | 
					        ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
 | 
				
			||||||
    """
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    return '{}/' + '/'.join(keys)
 | 
					    return '{}/' + '/'.join(keys)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def quote_to_marketstore_structarray(
 | 
					def quote_to_marketstore_structarray(
 | 
				
			||||||
    quote: Dict[str, Any],
 | 
					    quote: dict[str, Any],
 | 
				
			||||||
    last_fill: Optional[float]
 | 
					    last_fill: Optional[float]
 | 
				
			||||||
) -> np.array:
 | 
					) -> np.array:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
| 
						 | 
					@ -83,7 +85,7 @@ def quote_to_marketstore_structarray(
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
    secs, ns = now / 10**9, now % 10**9
 | 
					    secs, ns = now / 10**9, now % 10**9
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # pack into List[Tuple[str, Any]]
 | 
					    # pack into list[tuple[str, Any]]
 | 
				
			||||||
    array_input = []
 | 
					    array_input = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # insert 'Epoch' entry first and then 'Nanoseconds'.
 | 
					    # insert 'Epoch' entry first and then 'Nanoseconds'.
 | 
				
			||||||
| 
						 | 
					@ -123,17 +125,19 @@ async def get_client(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def ingest_quote_stream(
 | 
					async def ingest_quote_stream(
 | 
				
			||||||
    symbols: List[str],
 | 
					    symbols: list[str],
 | 
				
			||||||
    brokername: str,
 | 
					    brokername: str,
 | 
				
			||||||
    tries: int = 1,
 | 
					    tries: int = 1,
 | 
				
			||||||
    actorloglevel: str = None,
 | 
					    loglevel: str = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Ingest a broker quote stream into marketstore.
 | 
					    Ingest a broker quote stream into a ``marketstore`` tsdb.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    async with (
 | 
					    async with (
 | 
				
			||||||
        open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
 | 
					        open_feed(brokername, symbols, loglevel=loglevel) as feed,
 | 
				
			||||||
        get_client() as ms_client
 | 
					        get_client() as ms_client,
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        async for quotes in feed.stream:
 | 
					        async for quotes in feed.stream:
 | 
				
			||||||
            log.info(quotes)
 | 
					            log.info(quotes)
 | 
				
			||||||
| 
						 | 
					@ -152,30 +156,30 @@ async def ingest_quote_stream(
 | 
				
			||||||
                        'Size': tick.get('size')
 | 
					                        'Size': tick.get('size')
 | 
				
			||||||
                    }, last_fill=quote.get('broker_ts', None))
 | 
					                    }, last_fill=quote.get('broker_ts', None))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    await ms_client.write(
 | 
					                    await ms_client.write(array, _tick_tbk)
 | 
				
			||||||
                        array, _tick_tbk)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def stream_quotes(
 | 
					async def stream_quotes(
 | 
				
			||||||
    symbols: List[str],
 | 
					    symbols: list[str],
 | 
				
			||||||
    timeframe: str = '1Min',
 | 
					    timeframe: str = '1Min',
 | 
				
			||||||
    attr_group: str = 'TICK',
 | 
					    attr_group: str = 'TICK',
 | 
				
			||||||
    host: str = 'localhost',
 | 
					    host: str = 'localhost',
 | 
				
			||||||
    port: int = 5993,
 | 
					    port: int = 5993,
 | 
				
			||||||
    loglevel: str = None
 | 
					    loglevel: str = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Open a symbol stream from a running instance of marketstore and
 | 
					    Open a symbol stream from a running instance of marketstore and
 | 
				
			||||||
    log to console.
 | 
					    log to console.
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    tbks: Dict[str, str] = {
 | 
					    tbks: dict[str, str] = {
 | 
				
			||||||
        sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
 | 
					        sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# async def stream_quotes(
 | 
					# async def stream_quotes(
 | 
				
			||||||
#     symbols: List[str],
 | 
					#     symbols: list[str],
 | 
				
			||||||
#     host: str = 'localhost',
 | 
					#     host: str = 'localhost',
 | 
				
			||||||
#     port: int = 5993,
 | 
					#     port: int = 5993,
 | 
				
			||||||
#     diff_cached: bool = True,
 | 
					#     diff_cached: bool = True,
 | 
				
			||||||
| 
						 | 
					@ -187,7 +191,7 @@ async def stream_quotes(
 | 
				
			||||||
#     # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
					#     # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
				
			||||||
#     get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
					#     get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
				
			||||||
# 
 | 
					# 
 | 
				
			||||||
#     tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
 | 
					#     tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
 | 
				
			||||||
# 
 | 
					# 
 | 
				
			||||||
#     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
 | 
					#     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
 | 
				
			||||||
#         # send subs topics to server
 | 
					#         # send subs topics to server
 | 
				
			||||||
| 
						 | 
					@ -196,7 +200,7 @@ async def stream_quotes(
 | 
				
			||||||
#         )
 | 
					#         )
 | 
				
			||||||
#         log.info(resp)
 | 
					#         log.info(resp)
 | 
				
			||||||
# 
 | 
					# 
 | 
				
			||||||
#         async def recv() -> Dict[str, Any]:
 | 
					#         async def recv() -> dict[str, Any]:
 | 
				
			||||||
#             return msgpack.loads((await ws.get_message()), encoding='utf-8')
 | 
					#             return msgpack.loads((await ws.get_message()), encoding='utf-8')
 | 
				
			||||||
# 
 | 
					# 
 | 
				
			||||||
#         streams = (await recv())['streams']
 | 
					#         streams = (await recv())['streams']
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue