Crypto$ backend updates

- move to 3.9+ type annots
- add initial draft `open_history_client()` endpoints
- deliver `'fqsn'` keys in quote-stream init msgs
fqsns
Tyler Goodlet 2022-03-19 14:28:11 -04:00
parent 493e45e70a
commit 998a5acd92
2 changed files with 66 additions and 41 deletions
piker/brokers

View File

@ -18,8 +18,11 @@
Binance backend Binance backend
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator from typing import (
Any, Union, Optional,
AsyncGenerator, Callable,
)
import time import time
import trio import trio
@ -88,7 +91,7 @@ class Pair(BaseModel):
baseCommissionPrecision: int baseCommissionPrecision: int
quoteCommissionPrecision: int quoteCommissionPrecision: int
orderTypes: List[str] orderTypes: list[str]
icebergAllowed: bool icebergAllowed: bool
ocoAllowed: bool ocoAllowed: bool
@ -96,8 +99,8 @@ class Pair(BaseModel):
isSpotTradingAllowed: bool isSpotTradingAllowed: bool
isMarginTradingAllowed: bool isMarginTradingAllowed: bool
filters: List[Dict[str, Union[str, int, float]]] filters: list[dict[str, Union[str, int, float]]]
permissions: List[str] permissions: list[str]
@dataclass @dataclass
@ -145,7 +148,7 @@ class Client:
self, self,
method: str, method: str,
params: dict, params: dict,
) -> Dict[str, Any]: ) -> dict[str, Any]:
resp = await self._sesh.get( resp = await self._sesh.get(
path=f'/api/v3/{method}', path=f'/api/v3/{method}',
params=params, params=params,
@ -200,7 +203,7 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = None,
) -> Dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None: if self._pairs is not None:
data = self._pairs data = self._pairs
else: else:
@ -273,7 +276,7 @@ class Client:
return array return array
@asynccontextmanager @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
await client.cache_symbols() await client.cache_symbols()
@ -353,7 +356,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
} }
def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
"""Create a request subscription packet dict. """Create a request subscription packet dict.
https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
@ -368,6 +371,17 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
} }
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client:
yield client
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
@ -385,12 +399,12 @@ async def backfill_bars(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
@ -427,10 +441,11 @@ async def stream_quotes(
symbol: { symbol: {
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}, },
} }
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# setup subs # setup subs
@ -480,8 +495,7 @@ async def stream_quotes(
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
first_quote = {quote['symbol'].lower(): quote} task_status.started((init_msgs, quote))
task_status.started((init_msgs, first_quote))
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
feed_is_live.set() feed_is_live.set()

View File

@ -18,9 +18,9 @@
Kraken backend. Kraken backend.
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import Dict, List, Tuple, Any, Optional, AsyncIterator from typing import Any, Optional, AsyncIterator, Callable
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -80,7 +80,7 @@ ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True _show_wap_in_history = True
_symbol_info_translation: Dict[str, str] = { _symbol_info_translation: dict[str, str] = {
'tick_decimals': 'pair_decimals', 'tick_decimals': 'pair_decimals',
} }
@ -102,16 +102,16 @@ class Pair(BaseModel):
lot_multiplier: float lot_multiplier: float
# array of leverage amounts available when buying # array of leverage amounts available when buying
leverage_buy: List[int] leverage_buy: list[int]
# array of leverage amounts available when selling # array of leverage amounts available when selling
leverage_sell: List[int] leverage_sell: list[int]
# fee schedule array in [volume, percent fee] tuples # fee schedule array in [volume, percent fee] tuples
fees: List[Tuple[int, float]] fees: list[tuple[int, float]]
# maker fee schedule array in [volume, percent fee] tuples (if on # maker fee schedule array in [volume, percent fee] tuples (if on
# maker/taker) # maker/taker)
fees_maker: List[Tuple[int, float]] fees_maker: list[tuple[int, float]]
fee_volume_currency: str # volume discount currency fee_volume_currency: str # volume discount currency
margin_call: str # margin call level margin_call: str # margin call level
@ -153,7 +153,7 @@ class OHLC:
volume: float # Accumulated volume **within interval** volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval count: int # Number of trades within interval
# (sampled) generated tick data # (sampled) generated tick data
ticks: List[Any] = field(default_factory=list) ticks: list[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
@ -177,7 +177,7 @@ def get_config() -> dict[str, Any]:
def get_kraken_signature( def get_kraken_signature(
urlpath: str, urlpath: str,
data: Dict[str, Any], data: dict[str, Any],
secret: str secret: str
) -> str: ) -> str:
postdata = urllib.parse.urlencode(data) postdata = urllib.parse.urlencode(data)
@ -220,7 +220,7 @@ class Client:
self._secret = secret self._secret = secret
@property @property
def pairs(self) -> Dict[str, Any]: def pairs(self) -> dict[str, Any]:
if self._pairs is None: if self._pairs is None:
raise RuntimeError( raise RuntimeError(
"Make sure to run `cache_symbols()` on startup!" "Make sure to run `cache_symbols()` on startup!"
@ -233,7 +233,7 @@ class Client:
self, self,
method: str, method: str,
data: dict, data: dict,
) -> Dict[str, Any]: ) -> dict[str, Any]:
resp = await self._sesh.post( resp = await self._sesh.post(
path=f'/public/{method}', path=f'/public/{method}',
json=data, json=data,
@ -246,7 +246,7 @@ class Client:
method: str, method: str,
data: dict, data: dict,
uri_path: str uri_path: str
) -> Dict[str, Any]: ) -> dict[str, Any]:
headers = { headers = {
'Content-Type': 'Content-Type':
'application/x-www-form-urlencoded', 'application/x-www-form-urlencoded',
@ -266,16 +266,16 @@ class Client:
async def endpoint( async def endpoint(
self, self,
method: str, method: str,
data: Dict[str, Any] data: dict[str, Any]
) -> Dict[str, Any]: ) -> dict[str, Any]:
uri_path = f'/0/private/{method}' uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time())) data['nonce'] = str(int(1000*time.time()))
return await self._private(method, data, uri_path) return await self._private(method, data, uri_path)
async def get_trades( async def get_trades(
self, self,
data: Dict[str, Any] = {} data: dict[str, Any] = {}
) -> Dict[str, Any]: ) -> dict[str, Any]:
data['ofs'] = 0 data['ofs'] = 0
# Grab all trade history # Grab all trade history
# https://docs.kraken.com/rest/#operation/getTradeHistory # https://docs.kraken.com/rest/#operation/getTradeHistory
@ -378,7 +378,7 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = None,
) -> Dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None: if self._pairs is not None:
data = self._pairs data = self._pairs
else: else:
@ -452,7 +452,7 @@ class Client:
raise SymbolNotFound(json['error'][0] + f': {symbol}') raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager @acm
async def get_client() -> Client: async def get_client() -> Client:
section = get_config() section = get_config()
@ -521,7 +521,7 @@ def normalize_symbol(
return ticker.lower() return ticker.lower()
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: def make_auth_sub(data: dict[str, Any]) -> dict[str, str]:
''' '''
Create a request subscription packet dict. Create a request subscription packet dict.
@ -696,12 +696,12 @@ async def handle_order_requests(
async def trades_dialogue( async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
loglevel: str = None, loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection, token: str): async def subscribe(ws: wsproto.WSConnection, token: str):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
@ -980,7 +980,7 @@ def normalize(
return topic, quote return topic, quote
def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]:
''' '''
Create a request subscription packet dict. Create a request subscription packet dict.
@ -996,6 +996,17 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
yield client
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
@ -1017,7 +1028,7 @@ async def backfill_bars(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
@ -1025,7 +1036,7 @@ async def stream_quotes(
sub_type: str = 'ohlc', sub_type: str = 'ohlc',
# startup sync # startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -1064,10 +1075,11 @@ async def stream_quotes(
symbol: { symbol: {
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}, },
} }
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
@ -1121,8 +1133,7 @@ async def stream_quotes(
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
first_quote = {topic: quote} task_status.started((init_msgs, quote))
task_status.started((init_msgs, first_quote))
# lol, only "closes" when they're margin squeezing clients ;P # lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set() feed_is_live.set()