From 8e03212e40c22b5d78088e941e720be95248388a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Jun 2023 13:16:13 -0400 Subject: [PATCH] Always expand FQMEs with .venue and .expiry values Since there are indeed multiple futures (perp swaps) contracts including a set with expiry, we need a way to distinguish through search and `FutesPair` lookup which contract we're requesting. To solve this extend the `FutesPair` and `SpotPair` to include a `.bs_fqme` field similar to `MktPair` and key the `Client._pairs: ChainMap`'s backing tables with these expanded fqmes. For example the perp swap now expands to `btcusdt.usdtm.perp` which fills in the venue as `'usdtm'` (the usd-margined fututes market) and the expiry as `'perp'` (as before). This allows distinguishing explicitly from, for ex., coin-margined contracts which could instead (since we haven't added the support yet) fqmes of the sort `btcusdt.m.perp.binance` thus making it explicit and obvious which contract is which B) Further we interpolate the venue token to `spot` for spot markets going forward, which again makes cex spot markets explicit in symbology; we'll need to add this as well to other cex backends ;) Other misc detalles: - change USD-M futes `MarketType` key to `'usdtm_futes'`. - add `Pair.bs_fqme: str` for all pair subtypes with particular special contract handling for futes including quarterlies, perps and the weird "DEFI" ones.. - drop `OHLC.bar_wap` since it's no longer in the default time-series schema and we weren't filling it in here anyway.. - `Client._pairs: ChainMap` is now a read-only fqme-re-keyed view into the underlying pairs tables (which themselves are ideally keyed identically cross-venue) which we populate inside `Client.exch_info()` which itself now does concurrent pairs info fetching via a new `._cache_pairs()` using a `trio` task per API-venue. - support klines history query across all venues using same `Client.mkt_mode_req[Client.mkt_mode]` style as we're doing for `.exch_info()` B) - use the venue specific klines history query limits where documented. - handle new FQME venue / expiry fields inside `get_mkt_info()` ep such that again the correct `Client.mkt_mode` is selected based on parsing the desired spot vs. derivative contract. - do venue-specific-WSS-addr lookup based on output from `get_mkt_info()`; use usdtm venue WSS addr if a `FutesPair` is loaded. - set `topic: str` to the `.bs_fqme` value in live feed quotes! - use `Pair.bs_fqme: str` values for fuzzy-search input set. --- piker/brokers/_util.py | 2 +- piker/brokers/binance/api.py | 186 +++++++++++++++++++------------ piker/brokers/binance/broker.py | 3 +- piker/brokers/binance/feed.py | 96 ++++++++++++---- piker/brokers/binance/schemas.py | 44 +++++++- 5 files changed, 229 insertions(+), 102 deletions(-) diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 7e7a3ec7..baf2c7b2 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 8caaa760..52c510a6 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -22,7 +22,10 @@ Binance clients for http and ws APIs. """ from __future__ import annotations -from collections import OrderedDict +from collections import ( + OrderedDict, + ChainMap, +) from contextlib import ( asynccontextmanager as acm, ) @@ -30,6 +33,7 @@ from datetime import datetime from typing import ( Any, Callable, + Type, ) import hmac import hashlib @@ -138,10 +142,6 @@ class OHLC(Struct): buy_quote_vol: float ignore: int - # null the place holder for `bar_wap` until we - # figure out what to extract for this. - bar_wap: float = 0.0 - # convert datetime obj timestamp to unixtime in milliseconds def binance_timestamp( @@ -160,10 +160,24 @@ class Client: ''' def __init__( self, + + # TODO: change this to `Client.[mkt_]venue: MarketType`? mkt_mode: MarketType = 'spot', + ) -> None: - self._pairs: dict[str, Pair] = {} # mkt info table + # build out pair info tables for each market type + # and wrap in a chain-map view for search / query. + self._spot_pairs: dict[str, Pair] = {} # spot info table + self._ufutes_pairs: dict[str, Pair] = {} # usd-futures table + self._mkt2pairs: dict[str, dict] = { + 'spot': self._spot_pairs, + 'usdtm_futes': self._ufutes_pairs, + } + # NOTE: only stick in the spot table for now until exchange info + # is loaded, since at that point we'll suffix all the futes + # market symbols for use by search. See `.exch_info()`. + self._pairs: ChainMap[str, Pair] = ChainMap() # spot EPs sesh self._sesh = asks.Session(connections=4) @@ -192,10 +206,10 @@ class Client: self._fapi_sesh.headers.update(api_key_header) self.mkt_mode: MarketType = mkt_mode - self.mkt_req: dict[str, Callable] = { + self.mkt_mode_req: dict[str, Callable] = { 'spot': self._api, 'margin': self._sapi, - 'usd_futes': self._fapi, + 'usdtm_futes': self._fapi, # 'futes_coin': self._dapi, # TODO } @@ -307,6 +321,37 @@ class Client: return resproc(resp, log) + async def _cache_pairs( + self, + mkt_type: str, + + ) -> None: + # lookup internal mkt-specific pair table to update + pair_table: dict[str, Pair] = self._mkt2pairs[mkt_type] + + # make API request(s) + resp = await self.mkt_mode_req[mkt_type]( + 'exchangeInfo', + params={}, # NOTE: retrieve all symbols by default + ) + entries = resp['symbols'] + if not entries: + raise SymbolNotFound(f'No market pairs found!?:\n{resp}') + + for item in entries: + filters_ls: list = item.pop('filters', False) + if filters_ls: + filters = {} + for entry in filters_ls: + ftype = entry['filterType'] + filters[ftype] = entry + + item['filters'] = filters + + pair_type: Type = PAIRTYPES[mkt_type] + pair: Pair = pair_type(**item) + pair_table[pair.symbol.upper()] = pair + async def exch_info( self, sym: str | None = None, @@ -326,66 +371,51 @@ class Client: https://binance-docs.github.io/apidocs/delivery/en/#exchange-information ''' - mkt_type: MarketType = mkt_type or self.mkt_mode - cached_pair = self._pairs.get( - (sym, mkt_type) - ) - if cached_pair: + pair_table: dict[str, Pair] = self._mkt2pairs[ + mkt_type or self.mkt_mode + ] + if cached_pair := pair_table.get(sym): return cached_pair - # retrieve all symbols by default - params = {} - if sym is not None: - sym = sym.lower() - params = {'symbol': sym} + # params = {} + # if sym is not None: + # params = {'symbol': sym} - resp = await self.mkt_req[mkt_type]('exchangeInfo', params=params) - entries = resp['symbols'] - if not entries: - raise SymbolNotFound(f'{sym} not found:\n{resp}') + mkts: list[str] = ['spot', 'usdtm_futes'] + if mkt_type: + mkts: list[str] = [mkt_type] - # import tractor - # await tractor.breakpoint() - pairs: dict[str, Pair] = {} - for item in entries: + async with trio.open_nursery() as rn: + for mkt_type in mkts: + rn.start_soon( + self._cache_pairs, + mkt_type, + ) - # for spot mkts, pre-process .filters field into - # a table.. - filters_ls: list = item.pop('filters', False) - if filters_ls: - filters = {} - for entry in filters_ls: - ftype = entry['filterType'] - filters[ftype] = entry - - item['filters'] = filters - - symbol = item['symbol'] - pair_type: Pair = PAIRTYPES[mkt_type or self.mkt_mode] - pairs[(symbol, mkt_type)] = pair_type( - **item, + # make merged view of all market-type pairs but + # use market specific `Pair.bs_fqme` for keys! + for venue, venue_pairs_table in self._mkt2pairs.items(): + self._pairs.maps.append( + {pair.bs_fqme: pair + for pair in venue_pairs_table.values()} ) - self._pairs.update(pairs) - - if sym is not None: - return pairs[sym] - else: - return self._pairs + return pair_table[sym] if sym else self._pairs async def search_symbols( self, pattern: str, limit: int = None, ) -> dict[str, Any]: - if self._pairs is not None: - data = self._pairs - else: - data = await self.exch_info() + + # if self._spot_pairs is not None: + # data = self._spot_pairs + # else: + fq_pairs: dict = await self.exch_info() matches = fuzzy.extractBests( pattern, - data, + fq_pairs, score_cutoff=50, ) # repack in dict form @@ -395,12 +425,24 @@ class Client: async def bars( self, symbol: str, + start_dt: datetime | None = None, end_dt: datetime | None = None, - limit: int = 1000, # <- max allowed per query + as_np: bool = True, - ) -> dict: + ) -> list[tuple] | np.ndarray: + + # NOTE: diff market-venues have diff datums limits: + # - spot max is 1k + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data + # - usdm futes max is 1500 + # https://binance-docs.github.io/apidocs/futures/en/#kline-candlestick-data + limits: dict[str, int] = { + 'spot': 1000, + 'usdtm_futes': 1500, + } + limit = limits[self.mkt_mode] if end_dt is None: end_dt = now('UTC').add(minutes=1) @@ -413,7 +455,8 @@ class Client: end_time = binance_timestamp(end_dt) # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data - bars = await self._api( + bars = await self.mkt_mode_req[self.mkt_mode]( + # bars = await self._api( 'klines', params={ 'symbol': symbol.upper(), @@ -423,13 +466,7 @@ class Client: 'limit': limit } ) - - # TODO: pack this bars scheme into a ``pydantic`` validator type: - # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data - - # TODO: we should port this to ``pydantic`` to avoid doing - # manual validation ourselves.. - new_bars = [] + new_bars: list[tuple] = [] for i, bar in enumerate(bars): bar = OHLC(*bar) @@ -449,11 +486,13 @@ class Client: new_bars.append((i,) + tuple(row)) - array = np.array( + if not as_np: + return bars + + return np.array( new_bars, dtype=def_iohlcv_fields, - ) if as_np else bars - return array + ) async def get_positions( self, @@ -476,7 +515,6 @@ class Client: signed=True ) log.info(f'done. len {len(resp)}') - # await trio.sleep(3) return positions, volumes @@ -530,7 +568,7 @@ class Client: await self.cache_symbols() - # asset_precision = self._pairs[symbol]['baseAssetPrecision'] + # asset_precision = self._spot_pairs[symbol]['baseAssetPrecision'] # quote_precision = self._pairs[symbol]['quoteAssetPrecision'] params = OrderedDict([ @@ -624,12 +662,16 @@ class Client: @acm -async def get_client( - mkt_mode: str = 'spot', -) -> Client: - client = Client(mkt_mode=mkt_mode) +async def get_client() -> Client: - log.info(f'{client} in {mkt_mode} mode: caching exchange infos..') + client = Client() await client.exch_info() + log.info( + f'{client} in {client.mkt_mode} mode: caching exchange infos..\n' + 'Cached multi-market pairs:\n' + f'spot: {len(client._spot_pairs)}\n' + f'usdtm_futes: {len(client._ufutes_pairs)}\n' + f'Total: {len(client._pairs)}\n' + ) yield client diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 5c17b194..c5c84549 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -38,7 +38,7 @@ from piker.data._web_bs import ( open_autorecon_ws, NoBsWs, ) -from piker._cacheables import ( +from piker.brokers import ( open_cached_client, ) from piker.clearing._messages import ( @@ -104,7 +104,6 @@ async def trades_dialogue( ) -> AsyncIterator[dict[str, Any]]: async with open_cached_client('binance') as client: - await tractor.breakpoint() if not client.api_key: await ctx.started('paper') return diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 561b5fbc..fafff411 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -43,12 +43,15 @@ from fuzzywuzzy import process as fuzzy import numpy as np import tractor -from piker._cacheables import ( - async_lifo_cache, +from piker.brokers import ( open_cached_client, ) -from piker.accounting._mktinfo import ( +from piker._cacheables import ( + async_lifo_cache, +) +from piker.accounting import ( Asset, + DerivTypes, MktPair, unpack_fqme, digits_to_dec, @@ -69,6 +72,7 @@ from .api import ( ) from .schemas import ( Pair, + FutesPair, ) log = get_logger('piker.brokers.binance') @@ -219,8 +223,6 @@ async def open_history_client( ) -> tuple[Callable, int]: - symbol: str = mkt.bs_fqme - # TODO implement history getter for the new storage layer. async with open_cached_client('binance') as client: @@ -237,8 +239,20 @@ async def open_history_client( if timeframe != 60: raise DataUnavailable('Only 1m bars are supported') + # TODO: better wrapping for venue / mode? + # - eventually logic for usd vs. coin settled futes + # based on `MktPair.src` type/value? + # - maybe something like `async with + # Client.use_venue('usdtm_futes')` + if mkt.type_key in DerivTypes: + client.mkt_mode = 'usdtm_futes' + else: + client.mkt_mode = 'spot' + + # NOTE: always query using their native symbology! + mktid: str = mkt.bs_mktid array = await client.bars( - symbol, + mktid, start_dt=start_dt, end_dt=end_dt, ) @@ -269,22 +283,42 @@ async def get_mkt_info( fqme += '.binance' bs_fqme, _, broker = fqme.rpartition('.') - broker, mkt_ep, venue, suffix = unpack_fqme(fqme) - # bs_fqme, _, broker = fqme.partition('.') + broker, mkt_ep, venue, expiry = unpack_fqme(fqme) - mkt_mode: str = 'spot' - if 'perp' in bs_fqme: - mkt_mode = 'usd_futes' + # NOTE: see the `FutesPair.bs_fqme: str` implementation + # to understand the reverse market info lookup below. + venue: str = venue or 'spot' + mkt_mode: str = venue or 'spot' + _atype: str = '' + if ( + venue + and 'spot' not in venue.lower() + + # XXX: catch all in case user doesn't know which + # venue they want (usdtm vs. coinm) and we can choose + # a default (via config?) once we support coin-m APIs. + or 'perp' in bs_fqme.lower() + ): + mkt_mode: str = f'{venue.lower()}_futes' + if 'perp' in expiry: + _atype = 'perpetual_future' + + else: + _atype = 'future' async with open_cached_client( 'binance', - mkt_mode=mkt_mode, ) as client: + # switch mode depending on input pattern parsing + client.mkt_mode = mkt_mode + pair_str: str = mkt_ep.upper() pair: Pair = await client.exch_info(pair_str) - await tractor.breakpoint() + if 'futes' in mkt_mode: + assert isinstance(pair, FutesPair) + mkt = MktPair( dst=Asset( name=pair.baseAsset, @@ -299,7 +333,10 @@ async def get_mkt_info( price_tick=pair.price_tick, size_tick=pair.size_tick, bs_mktid=pair.symbol, + expiry=expiry, + venue=venue, broker='binance', + _atype=_atype, ) both = mkt, pair return both @@ -379,26 +416,33 @@ async def stream_quotes( FeedInit(mkt_info=mkt) ) - # TODO: detect whether futes or spot contact was requested from .api import ( _futes_ws, - # _spot_ws, + _spot_ws, ) - wsep: str = _futes_ws + + async with open_cached_client( + 'binance', + ) as client: + wsep: str = { + 'usdtm_futes': _futes_ws, + 'spot': _spot_ws, + }[client.mkt_mode] async with ( open_autorecon_ws( wsep, fixture=partial( subscribe, - symbols=symbols, + symbols=[mkt.bs_mktid], ), ) as ws, # avoid stream-gen closure from breaking trio.. aclosing(stream_messages(ws)) as msg_gen, ): + # log.info('WAITING ON FIRST LIVE QUOTE..') typ, quote = await anext(msg_gen) # pull a first quote and deliver @@ -413,15 +457,20 @@ async def stream_quotes( # import time # last = time.time() + # XXX NOTE: can't include the `.binance` suffix + # or the sampling loop will not broadcast correctly + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + # start streaming - async for typ, msg in msg_gen: + async for typ, quote in msg_gen: # period = time.time() - last # hz = 1/period if period else float('inf') # if hz > 60: # log.info(f'Binance quotez : {hz}') - topic = msg['symbol'].lower() - await send_chan.send({topic: msg}) + await send_chan.send({topic: quote}) # last = time.time() @@ -429,10 +478,11 @@ async def stream_quotes( async def open_symbol_search( ctx: tractor.Context, ) -> Client: + async with open_cached_client('binance') as client: # load all symbols locally for fast search - cache = await client.exch_info() + fqpairs_cache = await client.exch_info() await ctx.started() async with ctx.open_stream() as stream: @@ -442,11 +492,11 @@ async def open_symbol_search( matches = fuzzy.extractBests( pattern, - cache, + fqpairs_cache, score_cutoff=50, ) # repack in dict form await stream.send({ - item[0].symbol: item[0] + item[0].bs_fqme: item[0] for item in matches }) diff --git a/piker/brokers/binance/schemas.py b/piker/brokers/binance/schemas.py index c909c39c..fbd6f944 100644 --- a/piker/brokers/binance/schemas.py +++ b/piker/brokers/binance/schemas.py @@ -60,6 +60,10 @@ class Pair(Struct, frozen=True): step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0') return Decimal(step_size) + @property + def bs_fqme(self) -> str: + return self.symbol + class SpotPair(Pair, frozen=True): @@ -80,6 +84,10 @@ class SpotPair(Pair, frozen=True): allowedSelfTradePreventionModes: list[str] permissions: list[str] + @property + def bs_fqme(self) -> str: + return f'{self.symbol}.SPOT' + class FutesPair(Pair): @@ -111,16 +119,44 @@ class FutesPair(Pair): def quoteAssetPrecision(self) -> int: return self.quotePrecision + @property + def bs_fqme(self) -> str: + symbol: str = self.symbol + ctype: str = self.contractType + margin: str = self.marginAsset + + match ctype: + case 'PERPETUAL': + return f'{symbol}.{margin}M.PERP' + + case 'CURRENT_QUARTER': + pair, _, expiry = symbol.partition('_') + return f'{pair}.{margin}M.{expiry}' + + case '': + subtype: str = self.underlyingSubType[0] + match subtype: + case 'DEFI': + return f'{symbol}.{subtype}.PERP' + + breakpoint() + return f'{symbol}.WTFPWNEDBBQ' + + MarketType = Literal[ 'spot', - 'margin', - 'usd_futes', - 'coin_futes', + # 'margin', + 'usdtm_futes', + # 'coin_futes', ] PAIRTYPES: dict[MarketType, Pair] = { 'spot': SpotPair, - 'usd_futes': FutesPair, + 'usdtm_futes': FutesPair, + + # TODO: support coin-margined venue: + # https://binance-docs.github.io/apidocs/delivery/en/#change-log + # 'coinm_futes': CoinFutesPair, }