From 481618cc514acee4929900cba7cfce3c34edd58d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Aug 2023 16:09:45 -0400 Subject: [PATCH] kraken: handle ws live trading API symbology Of course I missed this first try but, we need to use the ws market pair symbology set (since apparently kraken loves redundancy at least 3 times XD) when processing transactions that arrive from live clears since it's an entirely different `LTC/EUR` style key then the `XLTCEUR` style delivered from the ReST eps.. As part of this: - add `Client._altnames`, `._wsnames` as `dict[str, Pair]` tables, leaving the `._AssetPairs` table as is keyed by the "xname"s. - Change `Pair.respname: str` -> `.xname` since these keys all just seem to have a weird 'X' prefix. - do the appropriately keyed pair table lookup via a new `api_name_set: str` to `norm_trade_records()` and set is correctly in the ws live txn handler task. --- piker/brokers/kraken/api.py | 79 ++++++++++++++++++--------------- piker/brokers/kraken/broker.py | 3 ++ piker/brokers/kraken/ledger.py | 22 +++++++-- piker/brokers/kraken/symbols.py | 4 +- piker/data/_web_bs.py | 20 ++++++--- 5 files changed, 81 insertions(+), 47 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index b9fb6540..a4dcdec9 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -106,16 +106,19 @@ class InvalidKey(ValueError): class Client: - # symbol mapping from all names to the altname - _altnames: dict[str, str] = {} - - # key-ed by kraken's own bs_mktids (like fricking "XXMRZEUR") - # with said keys used directly from EP responses so that ledger - # parsing can be easily accomplished from both trade-event-msgs - # and offline toml files + # assets and mkt pairs are key-ed by kraken's ReST response + # symbol-bs_mktids (we call them "X-keys" like fricking + # "XXMRZEUR"). these keys used directly since ledger endpoints + # return transaction sets keyed with the same set! _Assets: dict[str, Asset] = {} _AssetPairs: dict[str, Pair] = {} + # offer lookup tables for all .altname and .wsname + # to the equivalent .xname so that various symbol-schemas + # can be mapped to `Pair`s in the tables above. + _altnames: dict[str, str] = {} + _wsnames: dict[str, str] = {} + # key-ed by `Pair.bs_fqme: str`, and thus used for search # allowing for lookup using piker's own FQME symbology sys. _pairs: dict[str, Pair] = {} @@ -209,8 +212,8 @@ class Client: by_bsmktid: dict[str, dict] = resp['result'] balances: dict = {} - for respname, bal in by_bsmktid.items(): - asset: Asset = self._Assets[respname] + for xname, bal in by_bsmktid.items(): + asset: Asset = self._Assets[xname] # TODO: which KEY should we use? it's used to index # the `Account.pps: dict` .. @@ -367,7 +370,6 @@ class Client: asset_key: str = entry['asset'] asset: Asset = self._Assets[asset_key] asset_key: str = asset.name.lower() - # asset_key: str = self._altnames[asset_key].lower() # XXX: this is in the asset units (likely) so it isn't # quite the same as a commisions cost necessarily..) @@ -473,25 +475,31 @@ class Client: if err: raise SymbolNotFound(pair_patt) - # NOTE: we key pairs by our custom defined `.bs_fqme` - # field since we want to offer search over this key - # set, callers should fill out lookup tables for - # kraken's bs_mktid keys to map to these keys! - for key, data in resp['result'].items(): - pair = Pair(respname=key, **data) + # NOTE: we try to key pairs by our custom defined + # `.bs_fqme` field since we want to offer search over + # this pattern set, callers should fill out lookup + # tables for kraken's bs_mktid keys to map to these + # keys! + # XXX: FURTHER kraken's data eng team decided to offer + # 3 frickin market-pair-symbol key sets depending on + # which frickin API is being used. + # Example for the trading pair 'LTC tuple[str, Pair]: + ) -> str: ''' Normalize symbol names to to a 3x3 pair from the global definition map which we build out from the data retreived from @@ -636,7 +645,7 @@ class Client: ''' try: - return cls._altnames[pair_str.upper()] + return cls._altnames[pair_str.upper()].bs_fqme except KeyError as ke: raise SymbolNotFound(f'kraken has no {ke.args[0]}') diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 25a30346..53168c03 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -513,6 +513,7 @@ async def open_trade_dialog( ledger_trans: dict[str, Transaction] = await norm_trade_records( ledger, client, + api_name_set='xname', ) if not acnt.pps: @@ -534,6 +535,7 @@ async def open_trade_dialog( api_trans: dict[str, Transaction] = await norm_trade_records( tids2trades, client, + api_name_set='xname', ) # retrieve kraken reported balances @@ -743,6 +745,7 @@ async def handle_order_updates( new_trans = await norm_trade_records( trades, client, + api_name_set='wsname', ) ppmsgs = trades2pps( acnt, diff --git a/piker/brokers/kraken/ledger.py b/piker/brokers/kraken/ledger.py index 6dcd327e..f250d246 100644 --- a/piker/brokers/kraken/ledger.py +++ b/piker/brokers/kraken/ledger.py @@ -64,9 +64,19 @@ def norm_trade( 'sell': -1, }[record['type']] - rest_pair_key: str = record['pair'] - pair: Pair = pairs[rest_pair_key] + # NOTE: this value may be either the websocket OR the rest schema + # so we need to detect the key format and then choose the + # correct symbol lookup table to evetually get a ``Pair``.. + # See internals of `Client.asset_pairs()` for deats! + src_pair_key: str = record['pair'] + # XXX: kraken's data engineering is soo bad they require THREE + # different pair schemas (more or less seemingly tied to + # transport-APIs)..LITERALLY they return different market id + # pairs in the ledger endpoints vs. the websocket event subs.. + # lookup pair using appropriately provided tabled depending + # on API-key-schema.. + pair: Pair = pairs[src_pair_key] fqme: str = pair.bs_fqme.lower() + '.kraken' return Transaction( @@ -83,6 +93,7 @@ def norm_trade( async def norm_trade_records( ledger: dict[str, Any], client: Client, + api_name_set: str = 'xname', ) -> dict[str, Transaction]: ''' @@ -97,11 +108,16 @@ async def norm_trade_records( # mkt: MktPair = (await get_mkt_info(manual_fqme))[0] # fqme: str = mkt.fqme # assert fqme == manual_fqme + pairs: dict[str, Pair] = { + 'xname': client._AssetPairs, + 'wsname': client._wsnames, + 'altname': client._altnames, + }[api_name_set] records[tid] = norm_trade( tid, record, - pairs=client._AssetPairs, + pairs=pairs, ) return records diff --git a/piker/brokers/kraken/symbols.py b/piker/brokers/kraken/symbols.py index d71828b2..168b0508 100644 --- a/piker/brokers/kraken/symbols.py +++ b/piker/brokers/kraken/symbols.py @@ -43,7 +43,7 @@ from piker.accounting._mktinfo import ( # https://www.kraken.com/features/api#get-tradable-pairs class Pair(Struct): - respname: str # idiotic bs_mktid equiv i guess? + xname: str # idiotic bs_mktid equiv i guess? altname: str # alternate pair name wsname: str # WebSocket pair name (if available) aclass_base: str # asset class of base component @@ -94,7 +94,7 @@ class Pair(Struct): make up their minds on a better key set XD ''' - return self.respname + return self.xname @property def price_tick(self) -> Decimal: diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index e60f871b..256b35af 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -234,10 +234,13 @@ async def _reconnect_forever( f'{url} trying (RE)CONNECT' ) - async with trio.open_nursery() as n: - cs = nobsws._cs = n.cancel_scope - ws: WebSocketConnection - async with open_websocket_url(url) as ws: + ws: WebSocketConnection + try: + async with ( + trio.open_nursery() as n, + open_websocket_url(url) as ws, + ): + cs = nobsws._cs = n.cancel_scope nobsws._ws = ws log.info( f'{src_mod}\n' @@ -269,9 +272,11 @@ async def _reconnect_forever( # to let tasks run **inside** the ws open block above. nobsws._connected.set() await trio.sleep_forever() + except HandshakeError: + log.exception(f'Retrying connection') + + # ws & nursery block ends - # ws open block end - # nursery block end nobsws._connected = trio.Event() if cs.cancelled_caught: log.cancel( @@ -284,7 +289,8 @@ async def _reconnect_forever( and not nobsws._connected.is_set() ) - # -> from here, move to next reconnect attempt + # -> from here, move to next reconnect attempt iteration + # in the while loop above Bp else: log.exception(