Always expand FQMEs with .venue and .expiry values

Since there are indeed multiple futures (perp swaps) contracts including
a set with expiry, we need a way to distinguish through search and
`FutesPair` lookup which contract we're requesting. To solve this extend
the `FutesPair` and `SpotPair` to include a `.bs_fqme` field similar to
`MktPair` and key the `Client._pairs: ChainMap`'s backing tables with
these expanded fqmes. For example the perp swap now expands to
`btcusdt.usdtm.perp` which fills in the venue as `'usdtm'` (the
usd-margined fututes market) and the expiry as `'perp'` (as before).
This allows distinguishing explicitly from, for ex., coin-margined
contracts which could instead (since we haven't added the support yet)
fqmes of the sort `btcusdt.<coin>m.perp.binance` thus making it explicit
and obvious which contract is which B)

Further we interpolate the venue token to `spot` for spot markets going
forward, which again makes cex spot markets explicit in symbology; we'll
need to add this as well to other cex backends ;)

Other misc detalles:

- change USD-M futes `MarketType` key to `'usdtm_futes'`.

- add `Pair.bs_fqme: str` for all pair subtypes with particular
  special contract handling for futes including quarterlies, perps and
  the weird "DEFI" ones..

- drop `OHLC.bar_wap` since it's no longer in the default time-series
  schema and we weren't filling it in here anyway..

- `Client._pairs: ChainMap` is now a read-only fqme-re-keyed view into
  the underlying pairs tables (which themselves are ideally keyed
  identically cross-venue) which we populate inside `Client.exch_info()`
  which itself now does concurrent pairs info fetching via a new
  `._cache_pairs()` using a `trio` task per API-venue.

- support klines history query across all venues using same
  `Client.mkt_mode_req[Client.mkt_mode]` style as we're doing for
  `.exch_info()` B)
  - use the venue specific klines history query limits where documented.

- handle new FQME venue / expiry fields inside `get_mkt_info()` ep such
  that again the correct `Client.mkt_mode` is selected based on parsing
  the desired spot vs. derivative contract.

- do venue-specific-WSS-addr lookup based on output from
  `get_mkt_info()`; use usdtm venue WSS addr if a `FutesPair` is loaded.

- set `topic: str` to the `.bs_fqme` value in live feed quotes!

- use `Pair.bs_fqme: str` values for fuzzy-search input set.
basic_buy_bot
Tyler Goodlet 2023-06-14 13:16:13 -04:00
parent 4c4787ce58
commit 8e03212e40
5 changed files with 229 additions and 102 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by

View File

@ -22,7 +22,10 @@ Binance clients for http and ws APIs.
""" """
from __future__ import annotations from __future__ import annotations
from collections import OrderedDict from collections import (
OrderedDict,
ChainMap,
)
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
@ -30,6 +33,7 @@ from datetime import datetime
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type,
) )
import hmac import hmac
import hashlib import hashlib
@ -138,10 +142,6 @@ class OHLC(Struct):
buy_quote_vol: float buy_quote_vol: float
ignore: int ignore: int
# null the place holder for `bar_wap` until we
# figure out what to extract for this.
bar_wap: float = 0.0
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp( def binance_timestamp(
@ -160,10 +160,24 @@ class Client:
''' '''
def __init__( def __init__(
self, self,
# TODO: change this to `Client.[mkt_]venue: MarketType`?
mkt_mode: MarketType = 'spot', mkt_mode: MarketType = 'spot',
) -> None: ) -> None:
self._pairs: dict[str, Pair] = {} # mkt info table # build out pair info tables for each market type
# and wrap in a chain-map view for search / query.
self._spot_pairs: dict[str, Pair] = {} # spot info table
self._ufutes_pairs: dict[str, Pair] = {} # usd-futures table
self._mkt2pairs: dict[str, dict] = {
'spot': self._spot_pairs,
'usdtm_futes': self._ufutes_pairs,
}
# NOTE: only stick in the spot table for now until exchange info
# is loaded, since at that point we'll suffix all the futes
# market symbols for use by search. See `.exch_info()`.
self._pairs: ChainMap[str, Pair] = ChainMap()
# spot EPs sesh # spot EPs sesh
self._sesh = asks.Session(connections=4) self._sesh = asks.Session(connections=4)
@ -192,10 +206,10 @@ class Client:
self._fapi_sesh.headers.update(api_key_header) self._fapi_sesh.headers.update(api_key_header)
self.mkt_mode: MarketType = mkt_mode self.mkt_mode: MarketType = mkt_mode
self.mkt_req: dict[str, Callable] = { self.mkt_mode_req: dict[str, Callable] = {
'spot': self._api, 'spot': self._api,
'margin': self._sapi, 'margin': self._sapi,
'usd_futes': self._fapi, 'usdtm_futes': self._fapi,
# 'futes_coin': self._dapi, # TODO # 'futes_coin': self._dapi, # TODO
} }
@ -307,6 +321,37 @@ class Client:
return resproc(resp, log) return resproc(resp, log)
async def _cache_pairs(
self,
mkt_type: str,
) -> None:
# lookup internal mkt-specific pair table to update
pair_table: dict[str, Pair] = self._mkt2pairs[mkt_type]
# make API request(s)
resp = await self.mkt_mode_req[mkt_type](
'exchangeInfo',
params={}, # NOTE: retrieve all symbols by default
)
entries = resp['symbols']
if not entries:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
for item in entries:
filters_ls: list = item.pop('filters', False)
if filters_ls:
filters = {}
for entry in filters_ls:
ftype = entry['filterType']
filters[ftype] = entry
item['filters'] = filters
pair_type: Type = PAIRTYPES[mkt_type]
pair: Pair = pair_type(**item)
pair_table[pair.symbol.upper()] = pair
async def exch_info( async def exch_info(
self, self,
sym: str | None = None, sym: str | None = None,
@ -326,66 +371,51 @@ class Client:
https://binance-docs.github.io/apidocs/delivery/en/#exchange-information https://binance-docs.github.io/apidocs/delivery/en/#exchange-information
''' '''
mkt_type: MarketType = mkt_type or self.mkt_mode pair_table: dict[str, Pair] = self._mkt2pairs[
cached_pair = self._pairs.get( mkt_type or self.mkt_mode
(sym, mkt_type) ]
) if cached_pair := pair_table.get(sym):
if cached_pair:
return cached_pair return cached_pair
# retrieve all symbols by default # params = {}
params = {} # if sym is not None:
if sym is not None: # params = {'symbol': sym}
sym = sym.lower()
params = {'symbol': sym}
resp = await self.mkt_req[mkt_type]('exchangeInfo', params=params) mkts: list[str] = ['spot', 'usdtm_futes']
entries = resp['symbols'] if mkt_type:
if not entries: mkts: list[str] = [mkt_type]
raise SymbolNotFound(f'{sym} not found:\n{resp}')
# import tractor async with trio.open_nursery() as rn:
# await tractor.breakpoint() for mkt_type in mkts:
pairs: dict[str, Pair] = {} rn.start_soon(
for item in entries: self._cache_pairs,
mkt_type,
)
# for spot mkts, pre-process .filters field into # make merged view of all market-type pairs but
# a table.. # use market specific `Pair.bs_fqme` for keys!
filters_ls: list = item.pop('filters', False) for venue, venue_pairs_table in self._mkt2pairs.items():
if filters_ls: self._pairs.maps.append(
filters = {} {pair.bs_fqme: pair
for entry in filters_ls: for pair in venue_pairs_table.values()}
ftype = entry['filterType']
filters[ftype] = entry
item['filters'] = filters
symbol = item['symbol']
pair_type: Pair = PAIRTYPES[mkt_type or self.mkt_mode]
pairs[(symbol, mkt_type)] = pair_type(
**item,
) )
self._pairs.update(pairs) return pair_table[sym] if sym else self._pairs
if sym is not None:
return pairs[sym]
else:
return self._pairs
async def search_symbols( async def search_symbols(
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None:
data = self._pairs # if self._spot_pairs is not None:
else: # data = self._spot_pairs
data = await self.exch_info() # else:
fq_pairs: dict = await self.exch_info()
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
pattern, pattern,
data, fq_pairs,
score_cutoff=50, score_cutoff=50,
) )
# repack in dict form # repack in dict form
@ -395,12 +425,24 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str, symbol: str,
start_dt: datetime | None = None, start_dt: datetime | None = None,
end_dt: datetime | None = None, end_dt: datetime | None = None,
limit: int = 1000, # <- max allowed per query
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> list[tuple] | np.ndarray:
# NOTE: diff market-venues have diff datums limits:
# - spot max is 1k
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
# - usdm futes max is 1500
# https://binance-docs.github.io/apidocs/futures/en/#kline-candlestick-data
limits: dict[str, int] = {
'spot': 1000,
'usdtm_futes': 1500,
}
limit = limits[self.mkt_mode]
if end_dt is None: if end_dt is None:
end_dt = now('UTC').add(minutes=1) end_dt = now('UTC').add(minutes=1)
@ -413,7 +455,8 @@ class Client:
end_time = binance_timestamp(end_dt) end_time = binance_timestamp(end_dt)
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api( bars = await self.mkt_mode_req[self.mkt_mode](
# bars = await self._api(
'klines', 'klines',
params={ params={
'symbol': symbol.upper(), 'symbol': symbol.upper(),
@ -423,13 +466,7 @@ class Client:
'limit': limit 'limit': limit
} }
) )
new_bars: list[tuple] = []
# TODO: pack this bars scheme into a ``pydantic`` validator type:
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
# TODO: we should port this to ``pydantic`` to avoid doing
# manual validation ourselves..
new_bars = []
for i, bar in enumerate(bars): for i, bar in enumerate(bars):
bar = OHLC(*bar) bar = OHLC(*bar)
@ -449,11 +486,13 @@ class Client:
new_bars.append((i,) + tuple(row)) new_bars.append((i,) + tuple(row))
array = np.array( if not as_np:
return bars
return np.array(
new_bars, new_bars,
dtype=def_iohlcv_fields, dtype=def_iohlcv_fields,
) if as_np else bars )
return array
async def get_positions( async def get_positions(
self, self,
@ -476,7 +515,6 @@ class Client:
signed=True signed=True
) )
log.info(f'done. len {len(resp)}') log.info(f'done. len {len(resp)}')
# await trio.sleep(3) # await trio.sleep(3)
return positions, volumes return positions, volumes
@ -530,7 +568,7 @@ class Client:
await self.cache_symbols() await self.cache_symbols()
# asset_precision = self._pairs[symbol]['baseAssetPrecision'] # asset_precision = self._spot_pairs[symbol]['baseAssetPrecision']
# quote_precision = self._pairs[symbol]['quoteAssetPrecision'] # quote_precision = self._pairs[symbol]['quoteAssetPrecision']
params = OrderedDict([ params = OrderedDict([
@ -624,12 +662,16 @@ class Client:
@acm @acm
async def get_client( async def get_client() -> Client:
mkt_mode: str = 'spot',
) -> Client:
client = Client(mkt_mode=mkt_mode)
log.info(f'{client} in {mkt_mode} mode: caching exchange infos..') client = Client()
await client.exch_info() await client.exch_info()
log.info(
f'{client} in {client.mkt_mode} mode: caching exchange infos..\n'
'Cached multi-market pairs:\n'
f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n'
f'Total: {len(client._pairs)}\n'
)
yield client yield client

View File

@ -38,7 +38,7 @@ from piker.data._web_bs import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
) )
from piker._cacheables import ( from piker.brokers import (
open_cached_client, open_cached_client,
) )
from piker.clearing._messages import ( from piker.clearing._messages import (
@ -104,7 +104,6 @@ async def trades_dialogue(
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
await tractor.breakpoint()
if not client.api_key: if not client.api_key:
await ctx.started('paper') await ctx.started('paper')
return return

View File

@ -43,12 +43,15 @@ from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker._cacheables import ( from piker.brokers import (
async_lifo_cache,
open_cached_client, open_cached_client,
) )
from piker.accounting._mktinfo import ( from piker._cacheables import (
async_lifo_cache,
)
from piker.accounting import (
Asset, Asset,
DerivTypes,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
digits_to_dec, digits_to_dec,
@ -69,6 +72,7 @@ from .api import (
) )
from .schemas import ( from .schemas import (
Pair, Pair,
FutesPair,
) )
log = get_logger('piker.brokers.binance') log = get_logger('piker.brokers.binance')
@ -219,8 +223,6 @@ async def open_history_client(
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
symbol: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
@ -237,8 +239,20 @@ async def open_history_client(
if timeframe != 60: if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported') raise DataUnavailable('Only 1m bars are supported')
# TODO: better wrapping for venue / mode?
# - eventually logic for usd vs. coin settled futes
# based on `MktPair.src` type/value?
# - maybe something like `async with
# Client.use_venue('usdtm_futes')`
if mkt.type_key in DerivTypes:
client.mkt_mode = 'usdtm_futes'
else:
client.mkt_mode = 'spot'
# NOTE: always query using their native symbology!
mktid: str = mkt.bs_mktid
array = await client.bars( array = await client.bars(
symbol, mktid,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
@ -269,22 +283,42 @@ async def get_mkt_info(
fqme += '.binance' fqme += '.binance'
bs_fqme, _, broker = fqme.rpartition('.') bs_fqme, _, broker = fqme.rpartition('.')
broker, mkt_ep, venue, suffix = unpack_fqme(fqme) broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# bs_fqme, _, broker = fqme.partition('.')
mkt_mode: str = 'spot' # NOTE: see the `FutesPair.bs_fqme: str` implementation
if 'perp' in bs_fqme: # to understand the reverse market info lookup below.
mkt_mode = 'usd_futes' venue: str = venue or 'spot'
mkt_mode: str = venue or 'spot'
_atype: str = ''
if (
venue
and 'spot' not in venue.lower()
# XXX: catch all in case user doesn't know which
# venue they want (usdtm vs. coinm) and we can choose
# a default (via config?) once we support coin-m APIs.
or 'perp' in bs_fqme.lower()
):
mkt_mode: str = f'{venue.lower()}_futes'
if 'perp' in expiry:
_atype = 'perpetual_future'
else:
_atype = 'future'
async with open_cached_client( async with open_cached_client(
'binance', 'binance',
mkt_mode=mkt_mode,
) as client: ) as client:
# switch mode depending on input pattern parsing
client.mkt_mode = mkt_mode
pair_str: str = mkt_ep.upper() pair_str: str = mkt_ep.upper()
pair: Pair = await client.exch_info(pair_str) pair: Pair = await client.exch_info(pair_str)
await tractor.breakpoint() if 'futes' in mkt_mode:
assert isinstance(pair, FutesPair)
mkt = MktPair( mkt = MktPair(
dst=Asset( dst=Asset(
name=pair.baseAsset, name=pair.baseAsset,
@ -299,7 +333,10 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=pair.symbol, bs_mktid=pair.symbol,
expiry=expiry,
venue=venue,
broker='binance', broker='binance',
_atype=_atype,
) )
both = mkt, pair both = mkt, pair
return both return both
@ -379,26 +416,33 @@ async def stream_quotes(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
) )
# TODO: detect whether futes or spot contact was requested # TODO: detect whether futes or spot contact was requested
from .api import ( from .api import (
_futes_ws, _futes_ws,
# _spot_ws, _spot_ws,
) )
wsep: str = _futes_ws
async with open_cached_client(
'binance',
) as client:
wsep: str = {
'usdtm_futes': _futes_ws,
'spot': _spot_ws,
}[client.mkt_mode]
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
wsep, wsep,
fixture=partial( fixture=partial(
subscribe, subscribe,
symbols=symbols, symbols=[mkt.bs_mktid],
), ),
) as ws, ) as ws,
# avoid stream-gen closure from breaking trio.. # avoid stream-gen closure from breaking trio..
aclosing(stream_messages(ws)) as msg_gen, aclosing(stream_messages(ws)) as msg_gen,
): ):
# log.info('WAITING ON FIRST LIVE QUOTE..')
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
# pull a first quote and deliver # pull a first quote and deliver
@ -413,15 +457,20 @@ async def stream_quotes(
# import time # import time
# last = time.time() # last = time.time()
# XXX NOTE: can't include the `.binance` suffix
# or the sampling loop will not broadcast correctly
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
# start streaming # start streaming
async for typ, msg in msg_gen: async for typ, quote in msg_gen:
# period = time.time() - last # period = time.time() - last
# hz = 1/period if period else float('inf') # hz = 1/period if period else float('inf')
# if hz > 60: # if hz > 60:
# log.info(f'Binance quotez : {hz}') # log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower() await send_chan.send({topic: quote})
await send_chan.send({topic: msg})
# last = time.time() # last = time.time()
@ -429,10 +478,11 @@ async def stream_quotes(
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> Client: ) -> Client:
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = await client.exch_info() fqpairs_cache = await client.exch_info()
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
@ -442,11 +492,11 @@ async def open_symbol_search(
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
pattern, pattern,
cache, fqpairs_cache,
score_cutoff=50, score_cutoff=50,
) )
# repack in dict form # repack in dict form
await stream.send({ await stream.send({
item[0].symbol: item[0] item[0].bs_fqme: item[0]
for item in matches for item in matches
}) })

View File

@ -60,6 +60,10 @@ class Pair(Struct, frozen=True):
step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0') step_size: str = self.filters['LOT_SIZE']['stepSize'].rstrip('0')
return Decimal(step_size) return Decimal(step_size)
@property
def bs_fqme(self) -> str:
return self.symbol
class SpotPair(Pair, frozen=True): class SpotPair(Pair, frozen=True):
@ -80,6 +84,10 @@ class SpotPair(Pair, frozen=True):
allowedSelfTradePreventionModes: list[str] allowedSelfTradePreventionModes: list[str]
permissions: list[str] permissions: list[str]
@property
def bs_fqme(self) -> str:
return f'{self.symbol}.SPOT'
class FutesPair(Pair): class FutesPair(Pair):
@ -111,16 +119,44 @@ class FutesPair(Pair):
def quoteAssetPrecision(self) -> int: def quoteAssetPrecision(self) -> int:
return self.quotePrecision return self.quotePrecision
@property
def bs_fqme(self) -> str:
symbol: str = self.symbol
ctype: str = self.contractType
margin: str = self.marginAsset
match ctype:
case 'PERPETUAL':
return f'{symbol}.{margin}M.PERP'
case 'CURRENT_QUARTER':
pair, _, expiry = symbol.partition('_')
return f'{pair}.{margin}M.{expiry}'
case '':
subtype: str = self.underlyingSubType[0]
match subtype:
case 'DEFI':
return f'{symbol}.{subtype}.PERP'
breakpoint()
return f'{symbol}.WTFPWNEDBBQ'
MarketType = Literal[ MarketType = Literal[
'spot', 'spot',
'margin', # 'margin',
'usd_futes', 'usdtm_futes',
'coin_futes', # 'coin_futes',
] ]
PAIRTYPES: dict[MarketType, Pair] = { PAIRTYPES: dict[MarketType, Pair] = {
'spot': SpotPair, 'spot': SpotPair,
'usd_futes': FutesPair, 'usdtm_futes': FutesPair,
# TODO: support coin-margined venue:
# https://binance-docs.github.io/apidocs/delivery/en/#change-log
# 'coinm_futes': CoinFutesPair,
} }