Crypto$ backend updates

- move to 3.9+ type annots
- add initial draft `open_history_client()` endpoints
- deliver `'fqsn'` keys in quote-stream init msgs
mkts_backup
Tyler Goodlet 2022-03-19 14:28:11 -04:00
parent 197cad17a2
commit 01b594e828
2 changed files with 45 additions and 20 deletions

View File

@ -18,8 +18,11 @@
Binance backend Binance backend
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator from typing import (
Any, Union, Optional,
AsyncGenerator, Callable,
)
import time import time
import trio import trio
@ -88,7 +91,7 @@ class Pair(BaseModel):
baseCommissionPrecision: int baseCommissionPrecision: int
quoteCommissionPrecision: int quoteCommissionPrecision: int
orderTypes: List[str] orderTypes: list[str]
icebergAllowed: bool icebergAllowed: bool
ocoAllowed: bool ocoAllowed: bool
@ -96,8 +99,8 @@ class Pair(BaseModel):
isSpotTradingAllowed: bool isSpotTradingAllowed: bool
isMarginTradingAllowed: bool isMarginTradingAllowed: bool
filters: List[Dict[str, Union[str, int, float]]] filters: list[dict[str, Union[str, int, float]]]
permissions: List[str] permissions: list[str]
@dataclass @dataclass
@ -145,7 +148,7 @@ class Client:
self, self,
method: str, method: str,
params: dict, params: dict,
) -> Dict[str, Any]: ) -> dict[str, Any]:
resp = await self._sesh.get( resp = await self._sesh.get(
path=f'/api/v3/{method}', path=f'/api/v3/{method}',
params=params, params=params,
@ -200,7 +203,7 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = None,
) -> Dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None: if self._pairs is not None:
data = self._pairs data = self._pairs
else: else:
@ -273,7 +276,7 @@ class Client:
return array return array
@asynccontextmanager @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
await client.cache_symbols() await client.cache_symbols()
@ -353,7 +356,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
} }
def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
"""Create a request subscription packet dict. """Create a request subscription packet dict.
https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
@ -368,6 +371,17 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
} }
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client:
yield client
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
@ -385,12 +399,12 @@ async def backfill_bars(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
@ -427,10 +441,11 @@ async def stream_quotes(
symbol: { symbol: {
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}, },
} }
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# setup subs # setup subs
@ -480,8 +495,7 @@ async def stream_quotes(
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
first_quote = {quote['symbol'].lower(): quote} task_status.started((init_msgs, quote))
task_status.started((init_msgs, first_quote))
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
feed_is_live.set() feed_is_live.set()

View File

@ -18,9 +18,9 @@
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional, Callable
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -271,7 +271,7 @@ class Client:
raise SymbolNotFound(json['error'][0] + f': {symbol}') raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
@ -385,6 +385,17 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
yield client
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
@ -450,10 +461,11 @@ async def stream_quotes(
symbol: { symbol: {
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}, },
} }
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
@ -506,8 +518,7 @@ async def stream_quotes(
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
first_quote = {topic: quote} task_status.started((init_msgs, quote))
task_status.started((init_msgs, first_quote))
# lol, only "closes" when they're margin squeezing clients ;P # lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set() feed_is_live.set()