diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 06328d4f..eb4e735a 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -25,6 +25,7 @@ import asks asks.init('trio') __brokers__ = [ + 'binance', 'questrade', 'robinhood', 'ib', diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index d6a647ac..f7f15ca0 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -36,6 +36,7 @@ from trio_websocket._impl import ( import arrow import asks +from fuzzywuzzy import process as fuzzy import numpy as np import trio import tractor @@ -81,6 +82,7 @@ _ohlc_dtype = [ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = False +_search_conf = {'pause_period': 0.375} # https://binance-docs.github.io/apidocs/spot/en/#exchange-information @@ -148,38 +150,66 @@ class Client: def __init__(self) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location = _url + self._pairs: dict[str, Any] = {} async def _api( self, method: str, - data: dict, + params: dict, ) -> Dict[str, Any]: resp = await self._sesh.get( path=f'/api/v3/{method}', - params=data, + params=params, timeout=float('inf') ) return resproc(resp, log) async def symbol_info( + self, - sym: Optional[str] = None + sym: Optional[str] = None, - ) -> dict: + ) -> dict[str, Any]: + '''Get symbol info for the exchange. + + ''' + # TODO: we can load from our self._pairs cache + # on repeat calls... + + # will retrieve all symbols by default + params = {} - resp = await self._api('exchangeInfo', {}) if sym is not None: - for sym_info in resp['symbols']: - if sym_info['symbol'] == sym: - return sym_info - else: - raise SymbolNotFound(f'{sym} not found') + sym = sym.upper() + params = {'symbol': sym} + + resp = await self._api( + 'exchangeInfo', + params=params, + ) + + entries = resp['symbols'] + if not entries: + raise SymbolNotFound(f'{sym} not found') + + syms = {item['symbol']: item for item in entries} + + if sym is not None: + return syms[sym] else: - return resp['symbols'] + return syms + + async def cache_symbols( + self, + ) -> dict: + if not self._pairs: + self._pairs = await self.symbol_info() + + return self._pairs async def bars( self, - symbol: str = 'BTCUSDT', + symbol: str, start_time: int = None, end_time: int = None, limit: int = 1000, # <- max allowed per query @@ -198,8 +228,8 @@ class Client: # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data bars = await self._api( 'klines', - { - 'symbol': symbol, + params={ + 'symbol': symbol.upper(), 'interval': '1m', 'startTime': start_time, 'endTime': end_time, @@ -237,7 +267,9 @@ class Client: @asynccontextmanager async def get_client() -> Client: - yield Client() + client = Client() + await client.cache_symbols() + yield client # validation type @@ -436,11 +468,15 @@ async def stream_quotes( sym_infos = {} uid = 0 - async with open_cached_client('binance') as client, send_chan as send_chan: + async with ( + open_cached_client('binance') as client, + send_chan as send_chan, + ): # keep client cached for real-time section + cache = await client.cache_symbols() for sym in symbols: - d = await client.symbol_info(sym) + d = cache[sym.upper()] syminfo = Pair(**d) # validation sym_infos[sym] = syminfo.dict() @@ -483,7 +519,7 @@ async def stream_quotes( # TODO: use ``anext()`` when it lands in 3.10! typ, quote = await msg_gen.__anext__() - first_quote = {quote['symbol']: quote} + first_quote = {quote['symbol'].lower(): quote} task_status.started((init_msgs, first_quote)) # signal to caller feed is ready for consumption @@ -492,5 +528,32 @@ async def stream_quotes( # start streaming async for typ, msg in msg_gen: - topic = msg['symbol'] + topic = msg['symbol'].lower() await send_chan.send({topic: msg}) + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> Client: + async with open_cached_client('binance') as client: + + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started() + + async with ctx.open_stream() as stream: + + async for pattern in stream: + # results = await client.symbol_info(sym=pattern.upper()) + + matches = fuzzy.extractBests( + pattern, + cache, + score_cutoff=50, + ) + # repack in dict form + await stream.send( + {item[0]['symbol']: item[0] + for item in matches} + )