kraken: handle ws live trading API symbology

Of course I missed this first try but, we need to use the ws market pair
symbology set (since apparently kraken loves redundancy at least 3 times
XD) when processing transactions that arrive from live clears since it's
an entirely different `LTC/EUR` style key then the `XLTCEUR` style
delivered from the ReST eps..

As part of this:
- add `Client._altnames`, `._wsnames` as `dict[str, Pair]` tables,
  leaving the `._AssetPairs` table as is keyed by the "xname"s.
- Change `Pair.respname: str` -> `.xname` since these keys all just seem
  to have a weird 'X' prefix.
- do the appropriately keyed pair table lookup via a new `api_name_set:
  str` to `norm_trade_records()` and set is correctly in the ws live txn
  handler task.
ib_py311_fixes
Tyler Goodlet 2023-08-30 16:09:45 -04:00
parent 778d26067d
commit 481618cc51
5 changed files with 81 additions and 47 deletions

View File

@ -106,16 +106,19 @@ class InvalidKey(ValueError):
class Client: class Client:
# symbol mapping from all names to the altname # assets and mkt pairs are key-ed by kraken's ReST response
_altnames: dict[str, str] = {} # symbol-bs_mktids (we call them "X-keys" like fricking
# "XXMRZEUR"). these keys used directly since ledger endpoints
# key-ed by kraken's own bs_mktids (like fricking "XXMRZEUR") # return transaction sets keyed with the same set!
# with said keys used directly from EP responses so that ledger
# parsing can be easily accomplished from both trade-event-msgs
# and offline toml files
_Assets: dict[str, Asset] = {} _Assets: dict[str, Asset] = {}
_AssetPairs: dict[str, Pair] = {} _AssetPairs: dict[str, Pair] = {}
# offer lookup tables for all .altname and .wsname
# to the equivalent .xname so that various symbol-schemas
# can be mapped to `Pair`s in the tables above.
_altnames: dict[str, str] = {}
_wsnames: dict[str, str] = {}
# key-ed by `Pair.bs_fqme: str`, and thus used for search # key-ed by `Pair.bs_fqme: str`, and thus used for search
# allowing for lookup using piker's own FQME symbology sys. # allowing for lookup using piker's own FQME symbology sys.
_pairs: dict[str, Pair] = {} _pairs: dict[str, Pair] = {}
@ -209,8 +212,8 @@ class Client:
by_bsmktid: dict[str, dict] = resp['result'] by_bsmktid: dict[str, dict] = resp['result']
balances: dict = {} balances: dict = {}
for respname, bal in by_bsmktid.items(): for xname, bal in by_bsmktid.items():
asset: Asset = self._Assets[respname] asset: Asset = self._Assets[xname]
# TODO: which KEY should we use? it's used to index # TODO: which KEY should we use? it's used to index
# the `Account.pps: dict` .. # the `Account.pps: dict` ..
@ -367,7 +370,6 @@ class Client:
asset_key: str = entry['asset'] asset_key: str = entry['asset']
asset: Asset = self._Assets[asset_key] asset: Asset = self._Assets[asset_key]
asset_key: str = asset.name.lower() asset_key: str = asset.name.lower()
# asset_key: str = self._altnames[asset_key].lower()
# XXX: this is in the asset units (likely) so it isn't # XXX: this is in the asset units (likely) so it isn't
# quite the same as a commisions cost necessarily..) # quite the same as a commisions cost necessarily..)
@ -473,25 +475,31 @@ class Client:
if err: if err:
raise SymbolNotFound(pair_patt) raise SymbolNotFound(pair_patt)
# NOTE: we key pairs by our custom defined `.bs_fqme` # NOTE: we try to key pairs by our custom defined
# field since we want to offer search over this key # `.bs_fqme` field since we want to offer search over
# set, callers should fill out lookup tables for # this pattern set, callers should fill out lookup
# kraken's bs_mktid keys to map to these keys! # tables for kraken's bs_mktid keys to map to these
for key, data in resp['result'].items(): # keys!
pair = Pair(respname=key, **data) # XXX: FURTHER kraken's data eng team decided to offer
# 3 frickin market-pair-symbol key sets depending on
# which frickin API is being used.
# Example for the trading pair 'LTC<EUR'
# - the "X-key" from rest eps 'XLTCZEUR'
# - the "websocket key" from ws msgs is 'LTC/EUR'
# - the "altname key" also delivered in pair info is 'LTCEUR'
for xkey, data in resp['result'].items():
# always cache so we can possibly do faster lookup # NOTE: always cache in pairs tables for faster lookup
self._AssetPairs[key] = pair pair = Pair(xname=xkey, **data)
bs_fqme: str = pair.bs_fqme # register the above `Pair` structs for all
# key-sets/monikers: a set of 4 (frickin) tables
self._pairs[bs_fqme] = pair # acting as a combined surjection of all possible
# (and stupid) kraken names to their `Pair` obj.
# register the piker pair under all monikers, a giant flat self._AssetPairs[xkey] = pair
# surjection of all possible (and stupid) kraken names to self._pairs[pair.bs_fqme] = pair
# the FMQE style piker key. self._altnames[pair.altname] = pair
self._altnames[pair.altname] = bs_fqme self._wsnames[pair.wsname] = pair
self._altnames[pair.wsname] = bs_fqme
if pair_patt is not None: if pair_patt is not None:
return next(iter(self._pairs.items()))[1] return next(iter(self._pairs.items()))[1]
@ -506,12 +514,13 @@ class Client:
Load all market pair info build and cache it for downstream Load all market pair info build and cache it for downstream
use. use.
An ``._altnames: dict[str, str]`` is available for looking Multiple pair info lookup tables (like ``._altnames:
up the piker-native FQME style `Pair.bs_fqme: str` for any dict[str, str]``) are created for looking up the
input of the three (yes, it's that idiotic) available piker-native `Pair`-struct from any input of the three
key-sets that kraken frickin offers depending on the API (yes, it's that idiotic..) available symbol/pair-key-sets
including the .altname, .wsname and the weird ass default that kraken frickin offers depending on the API including
set they return in rest responses.. the .altname, .wsname and the weird ass default set they
return in ReST responses .xname..
''' '''
if ( if (
@ -628,7 +637,7 @@ class Client:
def to_bs_fqme( def to_bs_fqme(
cls, cls,
pair_str: str pair_str: str
) -> tuple[str, Pair]: ) -> str:
''' '''
Normalize symbol names to to a 3x3 pair from the global Normalize symbol names to to a 3x3 pair from the global
definition map which we build out from the data retreived from definition map which we build out from the data retreived from
@ -636,7 +645,7 @@ class Client:
''' '''
try: try:
return cls._altnames[pair_str.upper()] return cls._altnames[pair_str.upper()].bs_fqme
except KeyError as ke: except KeyError as ke:
raise SymbolNotFound(f'kraken has no {ke.args[0]}') raise SymbolNotFound(f'kraken has no {ke.args[0]}')

View File

@ -513,6 +513,7 @@ async def open_trade_dialog(
ledger_trans: dict[str, Transaction] = await norm_trade_records( ledger_trans: dict[str, Transaction] = await norm_trade_records(
ledger, ledger,
client, client,
api_name_set='xname',
) )
if not acnt.pps: if not acnt.pps:
@ -534,6 +535,7 @@ async def open_trade_dialog(
api_trans: dict[str, Transaction] = await norm_trade_records( api_trans: dict[str, Transaction] = await norm_trade_records(
tids2trades, tids2trades,
client, client,
api_name_set='xname',
) )
# retrieve kraken reported balances # retrieve kraken reported balances
@ -743,6 +745,7 @@ async def handle_order_updates(
new_trans = await norm_trade_records( new_trans = await norm_trade_records(
trades, trades,
client, client,
api_name_set='wsname',
) )
ppmsgs = trades2pps( ppmsgs = trades2pps(
acnt, acnt,

View File

@ -64,9 +64,19 @@ def norm_trade(
'sell': -1, 'sell': -1,
}[record['type']] }[record['type']]
rest_pair_key: str = record['pair'] # NOTE: this value may be either the websocket OR the rest schema
pair: Pair = pairs[rest_pair_key] # so we need to detect the key format and then choose the
# correct symbol lookup table to evetually get a ``Pair``..
# See internals of `Client.asset_pairs()` for deats!
src_pair_key: str = record['pair']
# XXX: kraken's data engineering is soo bad they require THREE
# different pair schemas (more or less seemingly tied to
# transport-APIs)..LITERALLY they return different market id
# pairs in the ledger endpoints vs. the websocket event subs..
# lookup pair using appropriately provided tabled depending
# on API-key-schema..
pair: Pair = pairs[src_pair_key]
fqme: str = pair.bs_fqme.lower() + '.kraken' fqme: str = pair.bs_fqme.lower() + '.kraken'
return Transaction( return Transaction(
@ -83,6 +93,7 @@ def norm_trade(
async def norm_trade_records( async def norm_trade_records(
ledger: dict[str, Any], ledger: dict[str, Any],
client: Client, client: Client,
api_name_set: str = 'xname',
) -> dict[str, Transaction]: ) -> dict[str, Transaction]:
''' '''
@ -97,11 +108,16 @@ async def norm_trade_records(
# mkt: MktPair = (await get_mkt_info(manual_fqme))[0] # mkt: MktPair = (await get_mkt_info(manual_fqme))[0]
# fqme: str = mkt.fqme # fqme: str = mkt.fqme
# assert fqme == manual_fqme # assert fqme == manual_fqme
pairs: dict[str, Pair] = {
'xname': client._AssetPairs,
'wsname': client._wsnames,
'altname': client._altnames,
}[api_name_set]
records[tid] = norm_trade( records[tid] = norm_trade(
tid, tid,
record, record,
pairs=client._AssetPairs, pairs=pairs,
) )
return records return records

View File

@ -43,7 +43,7 @@ from piker.accounting._mktinfo import (
# https://www.kraken.com/features/api#get-tradable-pairs # https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct): class Pair(Struct):
respname: str # idiotic bs_mktid equiv i guess? xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name altname: str # alternate pair name
wsname: str # WebSocket pair name (if available) wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component aclass_base: str # asset class of base component
@ -94,7 +94,7 @@ class Pair(Struct):
make up their minds on a better key set XD make up their minds on a better key set XD
''' '''
return self.respname return self.xname
@property @property
def price_tick(self) -> Decimal: def price_tick(self) -> Decimal:

View File

@ -234,10 +234,13 @@ async def _reconnect_forever(
f'{url} trying (RE)CONNECT' f'{url} trying (RE)CONNECT'
) )
async with trio.open_nursery() as n: ws: WebSocketConnection
cs = nobsws._cs = n.cancel_scope try:
ws: WebSocketConnection async with (
async with open_websocket_url(url) as ws: trio.open_nursery() as n,
open_websocket_url(url) as ws,
):
cs = nobsws._cs = n.cancel_scope
nobsws._ws = ws nobsws._ws = ws
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
@ -269,9 +272,11 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above. # to let tasks run **inside** the ws open block above.
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError:
log.exception(f'Retrying connection')
# ws & nursery block ends
# ws open block end
# nursery block end
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
if cs.cancelled_caught: if cs.cancelled_caught:
log.cancel( log.cancel(
@ -284,7 +289,8 @@ async def _reconnect_forever(
and not nobsws._connected.is_set() and not nobsws._connected.is_set()
) )
# -> from here, move to next reconnect attempt # -> from here, move to next reconnect attempt iteration
# in the while loop above Bp
else: else:
log.exception( log.exception(