diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index f4732e54..5034aca6 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -18,8 +18,11 @@ Binance backend """ -from contextlib import asynccontextmanager -from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator +from contextlib import asynccontextmanager as acm +from typing import ( + Any, Union, Optional, + AsyncGenerator, Callable, +) import time import trio @@ -88,7 +91,7 @@ class Pair(BaseModel): baseCommissionPrecision: int quoteCommissionPrecision: int - orderTypes: List[str] + orderTypes: list[str] icebergAllowed: bool ocoAllowed: bool @@ -96,8 +99,8 @@ class Pair(BaseModel): isSpotTradingAllowed: bool isMarginTradingAllowed: bool - filters: List[Dict[str, Union[str, int, float]]] - permissions: List[str] + filters: list[dict[str, Union[str, int, float]]] + permissions: list[str] @dataclass @@ -145,7 +148,7 @@ class Client: self, method: str, params: dict, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: resp = await self._sesh.get( path=f'/api/v3/{method}', params=params, @@ -200,7 +203,7 @@ class Client: self, pattern: str, limit: int = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs else: @@ -273,7 +276,7 @@ class Client: return array -@asynccontextmanager +@acm async def get_client() -> Client: client = Client() await client.cache_symbols() @@ -353,7 +356,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: } -def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: +def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: """Create a request subscription packet dict. https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams @@ -368,6 +371,17 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: } +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('binance') as client: + yield client + + async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa @@ -385,12 +399,12 @@ async def backfill_bars( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: # XXX: required to propagate ``tractor`` loglevel to piker logging @@ -427,10 +441,11 @@ async def stream_quotes( symbol: { 'symbol_info': sym_infos[sym], 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection): # setup subs @@ -480,8 +495,7 @@ async def stream_quotes( # TODO: use ``anext()`` when it lands in 3.10! typ, quote = await msg_gen.__anext__() - first_quote = {quote['symbol'].lower(): quote} - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, quote)) # signal to caller feed is ready for consumption feed_is_live.set() diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index f64ef7aa..4f5166db 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -18,9 +18,9 @@ Kraken backend. ''' -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from dataclasses import asdict, field -from typing import Dict, List, Tuple, Any, Optional, AsyncIterator +from typing import Any, Optional, AsyncIterator, Callable import time from trio_typing import TaskStatus @@ -80,7 +80,7 @@ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = True -_symbol_info_translation: Dict[str, str] = { +_symbol_info_translation: dict[str, str] = { 'tick_decimals': 'pair_decimals', } @@ -102,16 +102,16 @@ class Pair(BaseModel): lot_multiplier: float # array of leverage amounts available when buying - leverage_buy: List[int] + leverage_buy: list[int] # array of leverage amounts available when selling - leverage_sell: List[int] + leverage_sell: list[int] # fee schedule array in [volume, percent fee] tuples - fees: List[Tuple[int, float]] + fees: list[tuple[int, float]] # maker fee schedule array in [volume, percent fee] tuples (if on # maker/taker) - fees_maker: List[Tuple[int, float]] + fees_maker: list[tuple[int, float]] fee_volume_currency: str # volume discount currency margin_call: str # margin call level @@ -153,7 +153,7 @@ class OHLC: volume: float # Accumulated volume **within interval** count: int # Number of trades within interval # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + ticks: list[Any] = field(default_factory=list) def get_config() -> dict[str, Any]: @@ -177,7 +177,7 @@ def get_config() -> dict[str, Any]: def get_kraken_signature( urlpath: str, - data: Dict[str, Any], + data: dict[str, Any], secret: str ) -> str: postdata = urllib.parse.urlencode(data) @@ -220,7 +220,7 @@ class Client: self._secret = secret @property - def pairs(self) -> Dict[str, Any]: + def pairs(self) -> dict[str, Any]: if self._pairs is None: raise RuntimeError( "Make sure to run `cache_symbols()` on startup!" @@ -233,7 +233,7 @@ class Client: self, method: str, data: dict, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: resp = await self._sesh.post( path=f'/public/{method}', json=data, @@ -246,7 +246,7 @@ class Client: method: str, data: dict, uri_path: str - ) -> Dict[str, Any]: + ) -> dict[str, Any]: headers = { 'Content-Type': 'application/x-www-form-urlencoded', @@ -266,16 +266,16 @@ class Client: async def endpoint( self, method: str, - data: Dict[str, Any] - ) -> Dict[str, Any]: + data: dict[str, Any] + ) -> dict[str, Any]: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) return await self._private(method, data, uri_path) async def get_trades( self, - data: Dict[str, Any] = {} - ) -> Dict[str, Any]: + data: dict[str, Any] = {} + ) -> dict[str, Any]: data['ofs'] = 0 # Grab all trade history # https://docs.kraken.com/rest/#operation/getTradeHistory @@ -378,7 +378,7 @@ class Client: self, pattern: str, limit: int = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs else: @@ -452,7 +452,7 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') -@asynccontextmanager +@acm async def get_client() -> Client: section = get_config() @@ -521,7 +521,7 @@ def normalize_symbol( return ticker.lower() -def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: +def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: ''' Create a request subscription packet dict. @@ -696,12 +696,12 @@ async def handle_order_requests( async def trades_dialogue( ctx: tractor.Context, loglevel: str = None, -) -> AsyncIterator[Dict[str, Any]]: +) -> AsyncIterator[dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection, token: str): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe @@ -980,7 +980,7 @@ def normalize( return topic, quote -def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: +def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: ''' Create a request subscription packet dict. @@ -996,6 +996,17 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('kraken') as client: + yield client + + async def backfill_bars( sym: str, @@ -1017,7 +1028,7 @@ async def backfill_bars( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, @@ -1025,7 +1036,7 @@ async def stream_quotes( sub_type: str = 'ohlc', # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -1064,10 +1075,11 @@ async def stream_quotes( symbol: { 'symbol_info': sym_infos[sym], 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe @@ -1121,8 +1133,7 @@ async def stream_quotes( topic, quote = normalize(ohlc_last) - first_quote = {topic: quote} - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, quote)) # lol, only "closes" when they're margin squeezing clients ;P feed_is_live.set()