Merge pull request #184 from pikers/binance_syminfo_and_mintick

Binance syminfo and mintick
ems_hotfixes
goodboy 2021-05-26 12:11:02 -04:00 committed by GitHub
commit 36530d9cf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 21 deletions

View File

@ -25,6 +25,7 @@ import asks
asks.init('trio') asks.init('trio')
__brokers__ = [ __brokers__ = [
'binance',
'questrade', 'questrade',
'robinhood', 'robinhood',
'ib', 'ib',

View File

@ -36,6 +36,7 @@ from trio_websocket._impl import (
import arrow import arrow
import asks import asks
from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import trio import trio
import tractor import tractor
@ -81,6 +82,7 @@ _ohlc_dtype = [
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = False _show_wap_in_history = False
_search_conf = {'pause_period': 0.375}
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information # https://binance-docs.github.io/apidocs/spot/en/#exchange-information
@ -148,38 +150,66 @@ class Client:
def __init__(self) -> None: def __init__(self) -> None:
self._sesh = asks.Session(connections=4) self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url self._sesh.base_location = _url
self._pairs: dict[str, Any] = {}
async def _api( async def _api(
self, self,
method: str, method: str,
data: dict, params: dict,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
resp = await self._sesh.get( resp = await self._sesh.get(
path=f'/api/v3/{method}', path=f'/api/v3/{method}',
params=data, params=params,
timeout=float('inf') timeout=float('inf')
) )
return resproc(resp, log) return resproc(resp, log)
async def symbol_info( async def symbol_info(
self, self,
sym: Optional[str] = None sym: Optional[str] = None,
) -> dict: ) -> dict[str, Any]:
'''Get symbol info for the exchange.
'''
# TODO: we can load from our self._pairs cache
# on repeat calls...
# will retrieve all symbols by default
params = {}
resp = await self._api('exchangeInfo', {})
if sym is not None: if sym is not None:
for sym_info in resp['symbols']: sym = sym.upper()
if sym_info['symbol'] == sym: params = {'symbol': sym}
return sym_info
else: resp = await self._api(
raise SymbolNotFound(f'{sym} not found') 'exchangeInfo',
params=params,
)
entries = resp['symbols']
if not entries:
raise SymbolNotFound(f'{sym} not found')
syms = {item['symbol']: item for item in entries}
if sym is not None:
return syms[sym]
else: else:
return resp['symbols'] return syms
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def bars( async def bars(
self, self,
symbol: str = 'BTCUSDT', symbol: str,
start_time: int = None, start_time: int = None,
end_time: int = None, end_time: int = None,
limit: int = 1000, # <- max allowed per query limit: int = 1000, # <- max allowed per query
@ -198,8 +228,8 @@ class Client:
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api( bars = await self._api(
'klines', 'klines',
{ params={
'symbol': symbol, 'symbol': symbol.upper(),
'interval': '1m', 'interval': '1m',
'startTime': start_time, 'startTime': start_time,
'endTime': end_time, 'endTime': end_time,
@ -237,7 +267,9 @@ class Client:
@asynccontextmanager @asynccontextmanager
async def get_client() -> Client: async def get_client() -> Client:
yield Client() client = Client()
await client.cache_symbols()
yield client
# validation type # validation type
@ -256,11 +288,21 @@ class AggTrade(BaseModel):
async def stream_messages(ws): async def stream_messages(ws):
timeouts = 0
while True: while True:
with trio.move_on_after(5): with trio.move_on_after(5) as cs:
msg = await ws.recv_msg() msg = await ws.recv_msg()
if cs.cancelled_caught:
timeouts += 1
if timeouts > 2:
raise trio.TooSlowError("binance feed seems down?")
continue
# for l1 streams binance doesn't add an event type field so # for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys # identify those messages by matching keys
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
@ -436,13 +478,24 @@ async def stream_quotes(
sym_infos = {} sym_infos = {}
uid = 0 uid = 0
async with open_cached_client('binance') as client, send_chan as send_chan: async with (
open_cached_client('binance') as client,
send_chan as send_chan,
):
# keep client cached for real-time section # keep client cached for real-time section
cache = await client.cache_symbols()
for sym in symbols: for sym in symbols:
d = await client.symbol_info(sym) d = cache[sym.upper()]
syminfo = Pair(**d) # validation syminfo = Pair(**d) # validation
sym_infos[sym] = syminfo.dict()
si = sym_infos[sym] = syminfo.dict()
# XXX: after manually inspecting the response format we
# just directly pick out the info we need
si['price_tick_size'] = syminfo.filters[0]['tickSize']
si['lot_tick_size'] = syminfo.filters[2]['stepSize']
symbol = symbols[0] symbol = symbols[0]
@ -483,7 +536,7 @@ async def stream_quotes(
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
first_quote = {quote['symbol']: quote} first_quote = {quote['symbol'].lower(): quote}
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
@ -492,5 +545,32 @@ async def stream_quotes(
# start streaming # start streaming
async for typ, msg in msg_gen: async for typ, msg in msg_gen:
topic = msg['symbol'] topic = msg['symbol'].lower()
await send_chan.send({topic: msg}) await send_chan.send({topic: msg})
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('binance') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
async for pattern in stream:
# results = await client.symbol_info(sym=pattern.upper())
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send(
{item[0]['symbol']: item[0]
for item in matches}
)

View File

@ -75,6 +75,10 @@ setup(
# tsdbs # tsdbs
'pymarketstore', 'pymarketstore',
#'kivy', see requirement.txt; using a custom branch atm #'kivy', see requirement.txt; using a custom branch atm
# fuzzy search
'fuzzywuzzy[speedup]',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``... python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...