Py3.9+ type updates
							parent
							
								
									a2547a548f
								
							
						
					
					
						commit
						ff9208c15b
					
				| 
						 | 
				
			
			@ -24,14 +24,14 @@
 | 
			
		|||
- todo: docker container management automation
 | 
			
		||||
'''
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from typing import Dict, Any, List, Callable, Tuple, Optional
 | 
			
		||||
from typing import Any, Optional
 | 
			
		||||
import time
 | 
			
		||||
from math import isnan
 | 
			
		||||
# from math import isnan
 | 
			
		||||
 | 
			
		||||
import msgpack
 | 
			
		||||
# import msgpack
 | 
			
		||||
import numpy as np
 | 
			
		||||
import pandas as pd
 | 
			
		||||
import tractor
 | 
			
		||||
# import tractor
 | 
			
		||||
from trio_websocket import open_websocket_url
 | 
			
		||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -41,7 +41,7 @@ from ..data import open_feed
 | 
			
		|||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
 | 
			
		||||
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
 | 
			
		||||
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
 | 
			
		||||
 | 
			
		||||
_quote_dt = [
 | 
			
		||||
| 
						 | 
				
			
			@ -56,16 +56,18 @@ _quote_dt = [
 | 
			
		|||
]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def mk_tbk(keys: Tuple[str, str, str]) -> str:
 | 
			
		||||
    """Generate a marketstore table key from a tuple.
 | 
			
		||||
def mk_tbk(keys: tuple[str, str, str]) -> str:
 | 
			
		||||
    '''
 | 
			
		||||
    Generate a marketstore table key from a tuple.
 | 
			
		||||
    Converts,
 | 
			
		||||
        ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    return '{}/' + '/'.join(keys)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def quote_to_marketstore_structarray(
 | 
			
		||||
    quote: Dict[str, Any],
 | 
			
		||||
    quote: dict[str, Any],
 | 
			
		||||
    last_fill: Optional[float]
 | 
			
		||||
) -> np.array:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -83,7 +85,7 @@ def quote_to_marketstore_structarray(
 | 
			
		|||
        
 | 
			
		||||
    secs, ns = now / 10**9, now % 10**9
 | 
			
		||||
 | 
			
		||||
    # pack into List[Tuple[str, Any]]
 | 
			
		||||
    # pack into list[tuple[str, Any]]
 | 
			
		||||
    array_input = []
 | 
			
		||||
 | 
			
		||||
    # insert 'Epoch' entry first and then 'Nanoseconds'.
 | 
			
		||||
| 
						 | 
				
			
			@ -123,17 +125,19 @@ async def get_client(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def ingest_quote_stream(
 | 
			
		||||
    symbols: List[str],
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
    brokername: str,
 | 
			
		||||
    tries: int = 1,
 | 
			
		||||
    actorloglevel: str = None,
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Ingest a broker quote stream into marketstore.
 | 
			
		||||
    Ingest a broker quote stream into a ``marketstore`` tsdb.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async with (
 | 
			
		||||
        open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
 | 
			
		||||
        get_client() as ms_client
 | 
			
		||||
        open_feed(brokername, symbols, loglevel=loglevel) as feed,
 | 
			
		||||
        get_client() as ms_client,
 | 
			
		||||
    ):
 | 
			
		||||
        async for quotes in feed.stream:
 | 
			
		||||
            log.info(quotes)
 | 
			
		||||
| 
						 | 
				
			
			@ -152,30 +156,30 @@ async def ingest_quote_stream(
 | 
			
		|||
                        'Size': tick.get('size')
 | 
			
		||||
                    }, last_fill=quote.get('broker_ts', None))
 | 
			
		||||
 | 
			
		||||
                    await ms_client.write(
 | 
			
		||||
                        array, _tick_tbk)
 | 
			
		||||
                    
 | 
			
		||||
                    await ms_client.write(array, _tick_tbk)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_quotes(
 | 
			
		||||
    symbols: List[str],
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
    timeframe: str = '1Min',
 | 
			
		||||
    attr_group: str = 'TICK',
 | 
			
		||||
    host: str = 'localhost',
 | 
			
		||||
    port: int = 5993,
 | 
			
		||||
    loglevel: str = None
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Open a symbol stream from a running instance of marketstore and
 | 
			
		||||
    log to console.
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
    tbks: Dict[str, str] = {
 | 
			
		||||
    tbks: dict[str, str] = {
 | 
			
		||||
        sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def stream_quotes(
 | 
			
		||||
#     symbols: List[str],
 | 
			
		||||
#     symbols: list[str],
 | 
			
		||||
#     host: str = 'localhost',
 | 
			
		||||
#     port: int = 5993,
 | 
			
		||||
#     diff_cached: bool = True,
 | 
			
		||||
| 
						 | 
				
			
			@ -187,7 +191,7 @@ async def stream_quotes(
 | 
			
		|||
#     # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
#     get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
# 
 | 
			
		||||
#     tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
 | 
			
		||||
#     tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
 | 
			
		||||
# 
 | 
			
		||||
#     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
 | 
			
		||||
#         # send subs topics to server
 | 
			
		||||
| 
						 | 
				
			
			@ -196,7 +200,7 @@ async def stream_quotes(
 | 
			
		|||
#         )
 | 
			
		||||
#         log.info(resp)
 | 
			
		||||
# 
 | 
			
		||||
#         async def recv() -> Dict[str, Any]:
 | 
			
		||||
#         async def recv() -> dict[str, Any]:
 | 
			
		||||
#             return msgpack.loads((await ws.get_message()), encoding='utf-8')
 | 
			
		||||
# 
 | 
			
		||||
#         streams = (await recv())['streams']
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue