kraken: be symcache compatible!

This was more involved then expected but on the bright side, is going to
help drive a more general `Account` update/processing/loading API
providing for all the high-level txn update methods needed for any
backend to generically update the participant's account *state* via
an input ledger/txn set B)

Key changes to enable `SymbologyCache` compat:
- adjust `Client` pairs / assets lookup tables to include a duplicate
  keying of all assets and "asset pairs" using the (chitty) default key
  set that kraken ships which is NOT the `.altname` no `.wsname` keys;
  the "default ReST response keys" i guess?
  - `._AssetPairs` and `._Assets` are *these ^* rest-key sets delivered
    verbatim from the endpoint responses,
  - `._pairs` and `._assets` the equivalent value-sets keyed by piker
    style FQME-looking keys (now provided via the new
    `.kraken.symbols.Pair.bs_fqme: str` and the delivered `'altname'`
    field (for assets) respectively.
- re-implement `.get_assets()` and `.get_mkt_pairs()` to appropriately
  delegate to internal methods and these new (multi-keyed) tables to
  deliver the cacheable set of symbology info.
- adjust `.feed.get_mkt_info()` to handle parsing of both fqme-style and
  wtv(-the-shit-stupid) kraken key set a caller passes via
  a key-matches-first-table-style-scan after pre-processing the
  input `fqme: str`; also do the `Asset` lookups from the new
  `Pair.bs_dst/src_asset: str` fields which should always map correctly
  to an internal asset entry delivered by `Client.get_assets()`.

Dirty impl deatz:
- add new `.kraken.symbols` and move the newly refined `Pair` there.
- add `.kraken.ledger` and move in the factored out ledger processing
  routines.
- also move out what was the `has_pp()` and large chung of nested-ish
  looking acnt-position verification logic blocks into a new
  `verify_balances()` B)
account_tests
Tyler Goodlet 2023-07-16 18:20:15 -04:00
parent a5821ae9b1
commit 4c5507301e
6 changed files with 635 additions and 361 deletions

View File

@ -25,17 +25,27 @@ Sub-modules within break into the core functionalities:
wrapping around ``ib_insync``. wrapping around ``ib_insync``.
''' '''
from .symbols import Pair # for symcache
# required by `.brokers`
from .api import ( from .api import (
get_client, get_client,
) )
from .feed import ( from .feed import (
# required by `.accounting`, `.data`
get_mkt_info, get_mkt_info,
open_history_client,
# required by `.data`
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
open_history_client,
) )
from .broker import ( from .broker import (
# required by `.clearing`
open_trade_dialog, open_trade_dialog,
)
from .ledger import (
# required by `.accounting`
norm_trade,
norm_trade_records, norm_trade_records,
) )
@ -43,11 +53,13 @@ from .broker import (
__all__ = [ __all__ = [
'get_client', 'get_client',
'get_mkt_info', 'get_mkt_info',
'Pair',
'open_trade_dialog', 'open_trade_dialog',
'open_history_client', 'open_history_client',
'open_symbol_search', 'open_symbol_search',
'stream_quotes', 'stream_quotes',
'norm_trade_records', 'norm_trade_records',
'norm_trade',
] ]

View File

@ -15,12 +15,11 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Kraken web API wrapping. Core (web) API client
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from decimal import Decimal
import itertools import itertools
from typing import ( from typing import (
Any, Any,
@ -28,7 +27,6 @@ from typing import (
) )
import time import time
from bidict import bidict
import pendulum import pendulum
import asks import asks
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
@ -40,11 +38,11 @@ import base64
import trio import trio
from piker import config from piker import config
from piker.data.types import Struct
from piker.data import def_iohlcv_fields from piker.data import def_iohlcv_fields
from piker.accounting._mktinfo import ( from piker.accounting._mktinfo import (
Asset, Asset,
digits_to_dec, digits_to_dec,
dec_digits,
) )
from piker.brokers._util import ( from piker.brokers._util import (
resproc, resproc,
@ -54,6 +52,7 @@ from piker.brokers._util import (
) )
from piker.accounting import Transaction from piker.accounting import Transaction
from piker.log import get_logger from piker.log import get_logger
from .symbols import Pair
log = get_logger('piker.brokers.kraken') log = get_logger('piker.brokers.kraken')
@ -105,68 +104,22 @@ class InvalidKey(ValueError):
''' '''
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
base: str # asset id of base component
aclass_quote: str # asset class of quote component
quote: str # asset id of quote component
lot: str # volume lot size
cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
# amount to multiply lot volume by to get currency volume
lot_multiplier: float
# array of leverage amounts available when buying
leverage_buy: list[int]
# array of leverage amounts available when selling
leverage_sell: list[int]
# fee schedule array in [volume, percent fee] tuples
fees: list[tuple[int, float]]
# maker fee schedule array in [volume, percent fee] tuples (if on
# maker/taker)
fees_maker: list[tuple[int, float]]
fee_volume_currency: str # volume discount currency
margin_call: str # margin call level
margin_stop: str # stop-out/liquidation margin level
ordermin: float # minimum order volume for pair
tick_size: float # min price step size
status: str
short_position_limit: float = 0
long_position_limit: float = float('inf')
@property
def price_tick(self) -> Decimal:
return digits_to_dec(self.pair_decimals)
@property
def size_tick(self) -> Decimal:
return digits_to_dec(self.lot_decimals)
@property
def bs_fqme(self) -> str:
return f'{self.symbol}.SPOT'
class Client: class Client:
# symbol mapping from all names to the altname # symbol mapping from all names to the altname
_ntable: dict[str, str] = {} _altnames: dict[str, str] = {}
# 2-way map of symbol names to their "alt names" ffs XD # key-ed by kraken's own bs_mktids (like fricking "XXMRZEUR")
_altnames: bidict[str, str] = bidict() # with said keys used directly from EP responses so that ledger
# parsing can be easily accomplished from both trade-event-msgs
# and offline toml files
_Assets: dict[str, Asset] = {}
_AssetPairs: dict[str, Pair] = {}
# key-ed by `Pair.bs_fqme: str`, and thus used for search
# allowing for lookup using piker's own FQME symbology sys.
_pairs: dict[str, Pair] = {} _pairs: dict[str, Pair] = {}
_assets: dict[str, Asset] = {}
def __init__( def __init__(
self, self,
@ -186,15 +139,14 @@ class Client:
self._secret = secret self._secret = secret
self.conf: dict[str, str] = config self.conf: dict[str, str] = config
self.assets: dict[str, Asset] = {}
@property @property
def pairs(self) -> dict[str, Pair]: def pairs(self) -> dict[str, Pair]:
if self._pairs is None: if self._pairs is None:
raise RuntimeError( raise RuntimeError(
"Make sure to run `cache_symbols()` on startup!" "Client didn't run `.get_mkt_pairs()` on startup?!"
) )
# retreive and cache all symbols
return self._pairs return self._pairs
@ -254,17 +206,29 @@ class Client:
'Balance', 'Balance',
{}, {},
) )
by_bsmktid = resp['result'] by_bsmktid: dict[str, dict] = resp['result']
# TODO: we need to pull out the "asset" decimals balances: dict = {}
# data and return a `decimal.Decimal` instead here! for respname, bal in by_bsmktid.items():
# using the underlying Asset asset: Asset = self._Assets[respname]
return {
self._altnames[sym].lower(): float(bal)
for sym, bal in by_bsmktid.items()
}
async def get_assets(self) -> dict[str, Asset]: # TODO: which KEY should we use? it's used to index
# the `Account.pps: dict` ..
key: str = asset.name.lower()
# TODO: should we just return a `Decimal` here
# or is the rounded version ok?
balances[key] = round(
float(bal),
ndigits=dec_digits(asset.tx_tick)
)
return balances
async def get_assets(
self,
reload: bool = False,
) -> dict[str, Asset]:
''' '''
Load and cache all asset infos and pack into Load and cache all asset infos and pack into
our native ``Asset`` struct. our native ``Asset`` struct.
@ -282,21 +246,37 @@ class Client:
} }
''' '''
resp = await self._public('Assets', {}) if (
assets = resp['result'] not self._assets
or reload
):
resp = await self._public('Assets', {})
assets: dict[str, dict] = resp['result']
for bs_mktid, info in assets.items(): for bs_mktid, info in assets.items():
altname = self._altnames[bs_mktid] = info['altname']
aclass: str = info['aclass']
self.assets[bs_mktid] = Asset( altname: str = info['altname']
name=altname.lower(), aclass: str = info['aclass']
atype=f'crypto_{aclass}', asset = Asset(
tx_tick=digits_to_dec(info['decimals']), name=altname,
info=info, atype=f'crypto_{aclass}',
) tx_tick=digits_to_dec(info['decimals']),
info=info,
)
# NOTE: yes we keep 2 sets since kraken insists on
# keeping 3 frickin sets bc apparently they have
# no sane data engineers whol all like different
# keys for their fricking symbology sets..
self._Assets[bs_mktid] = asset
self._assets[altname.lower()] = asset
self._assets[altname] = asset
return self.assets # we return the "most native" set merged with our preferred
# naming (which i guess is the "altname" one) since that's
# what the symcache loader will be storing, and we need the
# keys that are easiest to match against in any trade
# records.
return self._Assets | self._assets
async def get_trades( async def get_trades(
self, self,
@ -377,23 +357,26 @@ class Client:
# 'amount': '0.00300726', 'fee': '0.00001000', 'time': # 'amount': '0.00300726', 'fee': '0.00001000', 'time':
# 1658347714, 'status': 'Success'}]} # 1658347714, 'status': 'Success'}]}
if xfers:
import tractor
await tractor.pp()
trans: dict[str, Transaction] = {} trans: dict[str, Transaction] = {}
for entry in xfers: for entry in xfers:
# look up the normalized name and asset info # look up the normalized name and asset info
asset_key = entry['asset'] asset_key: str = entry['asset']
asset = self.assets[asset_key] asset: Asset = self._Assets[asset_key]
asset_key = self._altnames[asset_key].lower() asset_key: str = asset.name.lower()
# asset_key: str = self._altnames[asset_key].lower()
# XXX: this is in the asset units (likely) so it isn't # XXX: this is in the asset units (likely) so it isn't
# quite the same as a commisions cost necessarily..) # quite the same as a commisions cost necessarily..)
# TODO: also round this based on `Pair` cost precision info?
cost = float(entry['fee']) cost = float(entry['fee'])
# fqme: str = asset_key + '.kraken'
fqme = asset_key + '.kraken'
tx = Transaction( tx = Transaction(
fqme=fqme, fqme=asset_key, # this must map to an entry in .assets!
sym=asset,
tid=entry['txid'], tid=entry['txid'],
dt=pendulum.from_timestamp(entry['time']), dt=pendulum.from_timestamp(entry['time']),
bs_mktid=f'{asset_key}{src_asset}', bs_mktid=f'{asset_key}{src_asset}',
@ -408,6 +391,11 @@ class Client:
# XXX: see note above # XXX: see note above
cost=cost, cost=cost,
# not a trade but a withdrawal or deposit on the
# asset (chain) system.
etype='transfer',
) )
trans[tx.tid] = tx trans[tx.tid] = tx
@ -458,7 +446,7 @@ class Client:
# txid is a transaction id given by kraken # txid is a transaction id given by kraken
return await self.endpoint('CancelOrder', {"txid": reqid}) return await self.endpoint('CancelOrder', {"txid": reqid})
async def pair_info( async def asset_pairs(
self, self,
pair_patt: str | None = None, pair_patt: str | None = None,
@ -470,64 +458,69 @@ class Client:
https://docs.kraken.com/rest/#tag/Market-Data/operation/getTradableAssetPairs https://docs.kraken.com/rest/#tag/Market-Data/operation/getTradableAssetPairs
''' '''
# get all pairs by default, or filter if not self._AssetPairs:
# to whatever pattern is provided as input. # get all pairs by default, or filter
pairs: dict[str, str] | None = None # to whatever pattern is provided as input.
if pair_patt is not None: req_pairs: dict[str, str] | None = None
pairs = {'pair': pair_patt} if pair_patt is not None:
req_pairs = {'pair': pair_patt}
resp = await self._public( resp = await self._public(
'AssetPairs', 'AssetPairs',
pairs, req_pairs,
) )
err = resp['error'] err = resp['error']
if err: if err:
raise SymbolNotFound(pair_patt) raise SymbolNotFound(pair_patt)
pairs: dict[str, Pair] = { # NOTE: we key pairs by our custom defined `.bs_fqme`
# field since we want to offer search over this key
# set, callers should fill out lookup tables for
# kraken's bs_mktid keys to map to these keys!
for key, data in resp['result'].items():
pair = Pair(respname=key, **data)
key: Pair(**data) # always cache so we can possibly do faster lookup
for key, data in resp['result'].items() self._AssetPairs[key] = pair
}
# always cache so we can possibly do faster lookup bs_fqme: str = pair.bs_fqme
self._pairs.update(pairs)
self._pairs[bs_fqme] = pair
# register the piker pair under all monikers, a giant flat
# surjection of all possible (and stupid) kraken names to
# the FMQE style piker key.
self._altnames[pair.altname] = bs_fqme
self._altnames[pair.wsname] = bs_fqme
if pair_patt is not None: if pair_patt is not None:
return next(iter(pairs.items()))[1] return next(iter(self._pairs.items()))[1]
return pairs return self._AssetPairs
async def cache_symbols(self) -> dict: async def get_mkt_pairs(
self,
reload: bool = False,
) -> dict:
''' '''
Load all market pair info build and cache it for downstream use. Load all market pair info build and cache it for downstream
use.
A ``._ntable: dict[str, str]`` is available for mapping the An ``._altnames: dict[str, str]`` is available for looking
websocket pair name-keys and their http endpoint API (smh) up the piker-native FQME style `Pair.bs_fqme: str` for any
equivalents to the "alternative name" which is generally the one input of the three (yes, it's that idiotic) available
we actually want to use XD key-sets that kraken frickin offers depending on the API
including the .altname, .wsname and the weird ass default
set they return in rest responses..
''' '''
if not self._pairs: if (
pairs = await self.pair_info() not self._pairs
assert self._pairs == pairs or reload
):
await self.asset_pairs()
# table of all ws and rest keys to their alt-name values. return self._AssetPairs
ntable: dict[str, str] = {}
for rest_key in list(pairs.keys()):
pair: Pair = pairs[rest_key]
altname = pair.altname
wsname = pair.wsname
ntable[altname] = ntable[rest_key] = ntable[wsname] = altname
# register the pair under all monikers, a giant flat
# surjection of all possible names to each info obj.
self._pairs[altname] = self._pairs[wsname] = pair
self._ntable.update(ntable)
return self._pairs
async def search_symbols( async def search_symbols(
self, self,
@ -543,8 +536,8 @@ class Client:
''' '''
if not len(self._pairs): if not len(self._pairs):
await self.cache_symbols() await self.get_mkt_pairs()
assert self._pairs, '`Client.cache_symbols()` was never called!?' assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
pattern, pattern,
@ -632,9 +625,9 @@ class Client:
raise BrokerError(errmsg) raise BrokerError(errmsg)
@classmethod @classmethod
def normalize_symbol( def to_bs_fqme(
cls, cls,
ticker: str pair_str: str
) -> tuple[str, Pair]: ) -> tuple[str, Pair]:
''' '''
Normalize symbol names to to a 3x3 pair from the global Normalize symbol names to to a 3x3 pair from the global
@ -643,7 +636,7 @@ class Client:
''' '''
try: try:
return cls._ntable[ticker] return cls._altnames[pair_str.upper()]
except KeyError as ke: except KeyError as ke:
raise SymbolNotFound(f'kraken has no {ke.args[0]}') raise SymbolNotFound(f'kraken has no {ke.args[0]}')
@ -655,6 +648,9 @@ async def get_client() -> Client:
if conf: if conf:
client = Client( client = Client(
conf, conf,
# TODO: don't break these up and just do internal
# conf lookups instead..
name=conf['key_descr'], name=conf['key_descr'],
api_key=conf['api_key'], api_key=conf['api_key'],
secret=conf['secret'] secret=conf['secret']
@ -666,6 +662,6 @@ async def get_client() -> Client:
# batch requests. # batch requests.
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets) nurse.start_soon(client.get_assets)
await client.cache_symbols() await client.get_mkt_pairs()
yield client yield client

View File

@ -24,7 +24,6 @@ from contextlib import (
) )
from functools import partial from functools import partial
from itertools import count from itertools import count
import math
from pprint import pformat from pprint import pformat
import time import time
from typing import ( from typing import (
@ -35,21 +34,16 @@ from typing import (
) )
from bidict import bidict from bidict import bidict
import pendulum
import trio import trio
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Position, Position,
PpTable, Account,
Transaction, Transaction,
TransactionLedger, TransactionLedger,
open_trade_ledger, open_trade_ledger,
open_pps, open_account,
get_likely_pair,
)
from piker.accounting._mktinfo import (
MktPair,
) )
from piker.clearing import( from piker.clearing import(
OrderDialogs, OrderDialogs,
@ -65,18 +59,24 @@ from piker.clearing._messages import (
BrokerdPosition, BrokerdPosition,
BrokerdStatus, BrokerdStatus,
) )
from piker.brokers import (
open_cached_client,
)
from piker.data import open_symcache
from .api import ( from .api import (
log, log,
Client, Client,
BrokerError, BrokerError,
get_client,
) )
from .feed import ( from .feed import (
get_mkt_info,
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
stream_messages, stream_messages,
) )
from .ledger import (
norm_trade_records,
verify_balances,
)
MsgUnion = Union[ MsgUnion = Union[
BrokerdCancel, BrokerdCancel,
@ -371,7 +371,8 @@ async def subscribe(
def trades2pps( def trades2pps(
table: PpTable, acnt: Account,
ledger: TransactionLedger,
acctid: str, acctid: str,
new_trans: dict[str, Transaction] = {}, new_trans: dict[str, Transaction] = {},
@ -379,13 +380,14 @@ def trades2pps(
) -> list[BrokerdPosition]: ) -> list[BrokerdPosition]:
if new_trans: if new_trans:
updated = table.update_from_trans( updated = acnt.update_from_ledger(
new_trans, new_trans,
symcache=ledger.symcache,
) )
log.info(f'Updated pps:\n{pformat(updated)}') log.info(f'Updated pps:\n{pformat(updated)}')
pp_entries, closed_pp_objs = table.dump_active() pp_entries, closed_pp_objs = acnt.dump_active()
pp_objs: dict[Union[str, int], Position] = table.pps pp_objs: dict[Union[str, int], Position] = acnt.pps
pps: dict[int, Position] pps: dict[int, Position]
position_msgs: list[dict] = [] position_msgs: list[dict] = []
@ -399,7 +401,7 @@ def trades2pps(
# backend suffix prefixed but when # backend suffix prefixed but when
# reading accounts from ledgers we # reading accounts from ledgers we
# don't need it and/or it's prefixed # don't need it and/or it's prefixed
# in the section table.. we should # in the section acnt.. we should
# just strip this from the message # just strip this from the message
# right since `.broker` is already # right since `.broker` is already
# included? # included?
@ -416,7 +418,7 @@ def trades2pps(
# as little as possible. we need to either do # as little as possible. we need to either do
# these writes in another actor, or try out `trio`'s # these writes in another actor, or try out `trio`'s
# async file IO api? # async file IO api?
table.write_config() acnt.write_config()
return position_msgs return position_msgs
@ -427,7 +429,12 @@ async def open_trade_dialog(
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
async with get_client() as client: async with (
# TODO: maybe bind these together and deliver
# a tuple from `.open_cached_client()`?
open_cached_client('kraken') as client,
open_symcache('kraken') as symcache,
):
# make ems flip to paper mode when no creds setup in # make ems flip to paper mode when no creds setup in
# `brokers.toml` B0 # `brokers.toml` B0
if not client._api_key: if not client._api_key:
@ -457,8 +464,8 @@ async def open_trade_dialog(
# - delete the *ABSOLUTE LAST* entry from account's corresponding # - delete the *ABSOLUTE LAST* entry from account's corresponding
# trade ledgers file (NOTE this MUST be the last record # trade ledgers file (NOTE this MUST be the last record
# delivered from the api ledger), # delivered from the api ledger),
# - open you ``pps.toml`` and find that same tid and delete it # - open you ``account.kraken.spot.toml`` and find that
# from the pp's clears table, # same tid and delete it from the pos's clears table,
# - set this flag to `True` # - set this flag to `True`
# #
# You should see an update come in after the order mode # You should see an update come in after the order mode
@ -469,172 +476,83 @@ async def open_trade_dialog(
# update things correctly. # update things correctly.
simulate_pp_update: bool = False simulate_pp_update: bool = False
table: PpTable acnt: Account
ledger: TransactionLedger ledger: TransactionLedger
with ( with (
open_pps( open_account(
'kraken', 'kraken',
acctid, acctid,
write_on_exit=True, write_on_exit=True,
) as table, ) as acnt,
open_trade_ledger( open_trade_ledger(
'kraken', 'kraken',
acctid, acctid,
symcache=symcache,
) as ledger, ) as ledger,
): ):
# transaction-ify the ledger entries # TODO: loading ledger entries should all be done
ledger_trans = await norm_trade_records(ledger) # within a newly implemented `async with open_account()
# as acnt` where `Account.ledger: TransactionLedger`
# can be used to explicitily update and write the
# offline TOML files!
# ------ - ------
# MOL the init sequence is:
# - get `Account` (with presumed pre-loaded ledger done
# beind the scenes as part of ctx enter).
# - pull new trades from API, update the ledger with
# normalized to `Transaction` entries of those
# records, presumably (and implicitly) update the
# acnt state including expiries, positions,
# transfers..), and finally of course existing
# per-asset balances.
# - validate all pos and balances ensuring there's
# no seemingly noticeable discrepancies?
if not table.pps: # LOAD and transaction-ify the EXISTING LEDGER
# NOTE: we can't use this since it first needs ledger_trans: dict[str, Transaction] = await norm_trade_records(
# broker: str input support! ledger,
# table.update_from_trans(ledger.to_trans()) client,
table.update_from_trans(ledger_trans) )
table.write_config()
if not acnt.pps:
acnt.update_from_ledger(
ledger_trans,
symcache=ledger.symcache,
)
acnt.write_config()
# TODO: eventually probably only load # TODO: eventually probably only load
# as far back as it seems is not deliverd in the # as far back as it seems is not deliverd in the
# most recent 50 trades and assume that by ordering we # most recent 50 trades and assume that by ordering we
# already have those records in the ledger. # already have those records in the ledger?
tids2trades = await client.get_trades() tids2trades: dict[str, dict] = await client.get_trades()
ledger.update(tids2trades) ledger.update(tids2trades)
if tids2trades: if tids2trades:
ledger.write_config() ledger.write_config()
api_trans = await norm_trade_records(tids2trades) api_trans: dict[str, Transaction] = await norm_trade_records(
tids2trades,
client,
)
# retrieve kraken reported balances # retrieve kraken reported balances
# and do diff with ledger to determine # and do diff with ledger to determine
# what amount of trades-transactions need # what amount of trades-transactions need
# to be reloaded. # to be reloaded.
balances = await client.get_balances() balances: dict[str, float] = await client.get_balances()
for dst, size in balances.items(): verify_balances(
acnt,
src_fiat,
balances,
client,
ledger,
ledger_trans,
api_trans,
)
# we don't care about tracking positions # XXX NOTE: only for simulate-testing a "new fill" since
# in the user's source fiat currency.
if (
dst == src_fiat
or not any(
dst in bs_mktid for bs_mktid in table.pps
)
):
log.warning(
f'Skipping balance `{dst}`:{size} for position calcs!'
)
continue
def has_pp(
dst: str,
size: float,
) -> Position | None:
src2dst: dict[str, str] = {}
for bs_mktid in table.pps:
likely_pair = get_likely_pair(
src_fiat,
dst,
bs_mktid,
)
if likely_pair:
src2dst[src_fiat] = dst
for src, dst in src2dst.items():
pair = f'{dst}{src_fiat}'
pp = table.pps.get(pair)
if (
pp
and math.isclose(pp.size, size)
):
return pp
elif (
size == 0
and pp.size
):
log.warning(
f'`kraken` account says you have a ZERO '
f'balance for {bs_mktid}:{pair}\n'
f'but piker seems to think `{pp.size}`\n'
'This is likely a discrepancy in piker '
'accounting if the above number is'
"large,' though it's likely to due lack"
"f tracking xfers fees.."
)
return pp
return None # signal no entry
pos = has_pp(dst, size)
if not pos:
# we have a balance for which there is no pp
# entry? so we have to likely update from the
# ledger.
updated = table.update_from_trans(ledger_trans)
log.info(f'Updated pps from ledger:\n{pformat(updated)}')
pos = has_pp(dst, size)
if (
not pos
and not simulate_pp_update
):
# try reloading from API
table.update_from_trans(api_trans)
pos = has_pp(dst, size)
if not pos:
# get transfers to make sense of abs balances.
# NOTE: we do this after ledger and API
# loading since we might not have an entry
# in the ``pps.toml`` for the necessary pair
# yet and thus this likely pair grabber will
# likely fail.
for bs_mktid in table.pps:
likely_pair = get_likely_pair(
src_fiat,
dst,
bs_mktid,
)
if likely_pair:
break
else:
raise ValueError(
'Could not find a position pair in '
'ledger for likely widthdrawal '
f'candidate: {dst}'
)
if likely_pair:
# this was likely pp that had a withdrawal
# from the dst asset out of the account.
xfer_trans = await client.get_xfers(
dst,
# TODO: not all src assets are
# 3 chars long...
src_asset=likely_pair[3:],
)
if xfer_trans:
updated = table.update_from_trans(
xfer_trans,
cost_scalar=1,
)
log.info(
f'Updated {dst} from transfers:\n'
f'{pformat(updated)}'
)
if has_pp(dst, size):
raise ValueError(
'Could not reproduce balance:\n'
f'dst: {dst}, {size}\n'
)
# only for simulate-testing a "new fill" since
# otherwise we have to actually conduct a live clear. # otherwise we have to actually conduct a live clear.
if simulate_pp_update: if simulate_pp_update:
tid = list(tids2trades)[0] tid = list(tids2trades)[0]
@ -643,25 +561,27 @@ async def open_trade_dialog(
reqids2txids[0] = last_trade_dict['ordertxid'] reqids2txids[0] = last_trade_dict['ordertxid']
ppmsgs: list[BrokerdPosition] = trades2pps( ppmsgs: list[BrokerdPosition] = trades2pps(
table, acnt,
ledger,
acctid, acctid,
) )
# sync with EMS delivering pps and accounts
await ctx.started((ppmsgs, [acc_name])) await ctx.started((ppmsgs, [acc_name]))
# TODO: ideally this blocks the this task # TODO: ideally this blocks the this task
# as little as possible. we need to either do # as little as possible. we need to either do
# these writes in another actor, or try out `trio`'s # these writes in another actor, or try out `trio`'s
# async file IO api? # async file IO api?
table.write_config() acnt.write_config()
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received. # Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {}) resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error') if err := resp.get('error'):
if err:
raise BrokerError(err) raise BrokerError(err)
token = resp['result']['token'] # resp token for ws init
token: str = resp['result']['token']
ws: NoBsWs ws: NoBsWs
async with ( async with (
@ -690,13 +610,14 @@ async def open_trade_dialog(
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
client,
ws, ws,
stream, stream,
ems_stream, ems_stream,
apiflows, apiflows,
ids, ids,
reqids2txids, reqids2txids,
table, acnt,
api_trans, api_trans,
acctid, acctid,
acc_name, acc_name,
@ -705,13 +626,14 @@ async def open_trade_dialog(
async def handle_order_updates( async def handle_order_updates(
client: Client, # only for pairs table needed in ledger proc
ws: NoBsWs, ws: NoBsWs,
ws_stream: AsyncIterator, ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
table: PpTable, acnt: Account,
# transaction records which will be updated # transaction records which will be updated
# on new trade clearing events (aka order "fills") # on new trade clearing events (aka order "fills")
@ -733,7 +655,7 @@ async def handle_order_updates(
# TODO: turns out you get the fill events from the # TODO: turns out you get the fill events from the
# `openOrders` before you get this, so it might be better # `openOrders` before you get this, so it might be better
# to do all fill/status/pp updates in that sub and just use # to do all fill/status/pos updates in that sub and just use
# this one for ledger syncs? # this one for ledger syncs?
# For eg. we could take the "last 50 trades" and do a diff # For eg. we could take the "last 50 trades" and do a diff
@ -818,9 +740,12 @@ async def handle_order_updates(
) )
await ems_stream.send(status_msg) await ems_stream.send(status_msg)
new_trans = await norm_trade_records(trades) new_trans = await norm_trade_records(
trades,
client,
)
ppmsgs = trades2pps( ppmsgs = trades2pps(
table, acnt,
acctid, acctid,
new_trans, new_trans,
) )
@ -1183,36 +1108,3 @@ async def handle_order_updates(
}) })
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')
async def norm_trade_records(
ledger: dict[str, Any],
) -> dict[str, Transaction]:
records: dict[str, Transaction] = {}
for tid, record in ledger.items():
size = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
# we normalize to kraken's `altname` always..
bs_mktid: str = Client.normalize_symbol(record['pair'])
fqme = f'{bs_mktid.lower()}.kraken'
mkt: MktPair = (await get_mkt_info(fqme))[0]
records[tid] = Transaction(
fqme=fqme,
sym=mkt,
tid=tid,
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
bs_mktid=bs_mktid,
)
return records

View File

@ -282,11 +282,13 @@ async def get_mkt_info(
''' '''
venue: str = 'spot' venue: str = 'spot'
expiry: str = '' expiry: str = ''
if '.kraken' in fqme: if '.kraken' not in fqme:
broker, pair, venue, expiry = unpack_fqme(fqme) fqme += '.kraken'
venue: str = venue or 'spot'
if venue != 'spot': broker, pair, venue, expiry = unpack_fqme(fqme)
venue: str = venue or 'spot'
if venue.lower() != 'spot':
raise SymbolNotFound( raise SymbolNotFound(
'kraken only supports spot markets right now!\n' 'kraken only supports spot markets right now!\n'
f'{fqme}\n' f'{fqme}\n'
@ -295,14 +297,20 @@ async def get_mkt_info(
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
bs_fqme, _, broker = fqme.partition('.') # bs_fqme, _, broker = fqme.partition('.')
pair_str: str = bs_fqme.upper() # pair_str: str = bs_fqme.upper()
bs_mktid: str = Client.normalize_symbol(pair_str) pair_str: str = f'{pair}.{venue}'
pair: Pair = await client.pair_info(pair_str)
assets = client.assets pair: Pair | None = client._pairs.get(pair_str.upper())
dst_asset: Asset = assets[pair.base] if not pair:
src_asset: Asset = assets[pair.quote] bs_fqme: str = Client.to_bs_fqme(pair_str)
pair: Pair = client._pairs[bs_fqme]
if not (assets := client._assets):
assets: dict[str, Asset] = await client.get_assets()
dst_asset: Asset = assets[pair.bs_dst_asset]
src_asset: Asset = assets[pair.bs_src_asset]
mkt = MktPair( mkt = MktPair(
dst=dst_asset, dst=dst_asset,
@ -310,7 +318,7 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=bs_mktid, bs_mktid=pair.bs_mktid,
expiry=expiry, expiry=expiry,
venue=venue or 'spot', venue=venue or 'spot',
@ -488,7 +496,7 @@ async def open_symbol_search(
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = await client.cache_symbols() cache = await client.get_mkt_pairs()
await ctx.started(cache) await ctx.started(cache)
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
@ -497,7 +505,7 @@ async def open_symbol_search(
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
pattern, pattern,
cache, client._pairs,
score_cutoff=50, score_cutoff=50,
) )
# repack in dict form # repack in dict form

View File

@ -0,0 +1,252 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Trade transaction accounting and normalization.
'''
import math
from pprint import pformat
from typing import (
Any,
)
import pendulum
from piker.accounting import (
Transaction,
Position,
Account,
get_likely_pair,
TransactionLedger,
# MktPair,
)
from piker.data import (
# SymbologyCache,
Struct,
)
from .api import (
log,
Client,
Pair,
)
# from .feed import get_mkt_info
def norm_trade(
tid: str,
record: dict[str, Any],
# this is the dict that was returned from
# `Client.get_mkt_pairs()` and when running offline ledger
# processing from `.accounting`, this will be the table loaded
# into `SymbologyCache.pairs`.
pairs: dict[str, Struct],
) -> Transaction:
size: float = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
rest_pair_key: str = record['pair']
pair: Pair = pairs[rest_pair_key]
fqme: str = pair.bs_fqme.lower() + '.kraken'
return Transaction(
fqme=fqme,
tid=tid,
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
bs_mktid=pair.bs_mktid,
)
async def norm_trade_records(
ledger: dict[str, Any],
client: Client,
) -> dict[str, Transaction]:
'''
Loop through an input ``dict`` of trade records
and convert them to ``Transactions``.
'''
records: dict[str, Transaction] = {}
for tid, record in ledger.items():
# manual_fqme: str = f'{bs_mktid.lower()}.kraken'
# mkt: MktPair = (await get_mkt_info(manual_fqme))[0]
# fqme: str = mkt.fqme
# assert fqme == manual_fqme
records[tid] = norm_trade(
tid,
record,
pairs=client._AssetPairs,
)
return records
def has_pp(
acnt: Account,
src_fiat: str,
dst: str,
size: float,
) -> Position | None:
src2dst: dict[str, str] = {}
for bs_mktid in acnt.pps:
likely_pair = get_likely_pair(
src_fiat,
dst,
bs_mktid,
)
if likely_pair:
src2dst[src_fiat] = dst
for src, dst in src2dst.items():
pair: str = f'{dst}{src_fiat}'
pos: Position = acnt.pps.get(pair)
if (
pos
and math.isclose(pos.size, size)
):
return pos
elif (
size == 0
and pos.size
):
log.warning(
f'`kraken` account says you have a ZERO '
f'balance for {bs_mktid}:{pair}\n'
f'but piker seems to think `{pos.size}`\n'
'This is likely a discrepancy in piker '
'accounting if the above number is'
"large,' though it's likely to due lack"
"f tracking xfers fees.."
)
return pos
return None # indicate no entry found
# TODO: factor most of this "account updating from txns" into the
# the `Account` impl so has to provide for hiding the mostly
# cross-provider updates from txn sets
async def verify_balances(
acnt: Account,
src_fiat: str,
balances: dict[str, float],
client: Client,
ledger: TransactionLedger,
ledger_trans: dict[str, Transaction], # from toml
api_trans: dict[str, Transaction], # from API
simulate_pp_update: bool = False,
) -> None:
for dst, size in balances.items():
# we don't care about tracking positions
# in the user's source fiat currency.
if (
dst == src_fiat
or not any(
dst in bs_mktid for bs_mktid in acnt.pps
)
):
log.warning(
f'Skipping balance `{dst}`:{size} for position calcs!'
)
continue
# we have a balance for which there is no pos entry
# - we have to likely update from the ledger?
if not has_pp(acnt, src_fiat, dst, size):
updated = acnt.update_from_ledger(
ledger_trans,
symcache=ledger.symcache,
)
log.info(f'Updated pps from ledger:\n{pformat(updated)}')
# FIRST try reloading from API records
if (
not has_pp(acnt, src_fiat, dst, size)
and not simulate_pp_update
):
acnt.update_from_ledger(
api_trans,
symcache=ledger.symcache,
)
# get transfers to make sense of abs
# balances.
# NOTE: we do this after ledger and API
# loading since we might not have an
# entry in the
# ``account.kraken.spot.toml`` for the
# necessary pair yet and thus this
# likely pair grabber will likely fail.
if not has_pp(acnt, src_fiat, dst, size):
for bs_mktid in acnt.pps:
likely_pair: str | None = get_likely_pair(
src_fiat,
dst,
bs_mktid,
)
if likely_pair:
break
else:
raise ValueError(
'Could not find a position pair in '
'ledger for likely widthdrawal '
f'candidate: {dst}'
)
# this was likely pos that had a withdrawal
# from the dst asset out of the account.
if likely_pair:
xfer_trans = await client.get_xfers(
dst,
# TODO: not all src assets are
# 3 chars long...
src_asset=likely_pair[3:],
)
if xfer_trans:
updated = acnt.update_from_ledger(
xfer_trans,
cost_scalar=1,
symcache=ledger.symcache,
)
log.info(
f'Updated {dst} from transfers:\n'
f'{pformat(updated)}'
)
if has_pp(acnt, src_fiat, dst, size):
raise ValueError(
'Could not reproduce balance:\n'
f'dst: {dst}, {size}\n'
)

View File

@ -0,0 +1,114 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Symbology defs and deats!
'''
from decimal import Decimal
from piker.accounting._mktinfo import (
digits_to_dec,
)
from piker.data.types import Struct
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
respname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
base: str # asset id of base component
aclass_quote: str # asset class of quote component
quote: str # asset id of quote component
lot: str # volume lot size
cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
# amount to multiply lot volume by to get currency volume
lot_multiplier: float
# array of leverage amounts available when buying
leverage_buy: list[int]
# array of leverage amounts available when selling
leverage_sell: list[int]
# fee schedule array in [volume, percent fee] tuples
fees: list[tuple[int, float]]
# maker fee schedule array in [volume, percent fee] tuples (if on
# maker/taker)
fees_maker: list[tuple[int, float]]
fee_volume_currency: str # volume discount currency
margin_call: str # margin call level
margin_stop: str # stop-out/liquidation margin level
ordermin: float # minimum order volume for pair
tick_size: float # min price step size
status: str
short_position_limit: float = 0
long_position_limit: float = float('inf')
# TODO: should we make this a literal NamespacePath ref?
ns_path: str = 'piker.brokers.kraken:Pair'
@property
def bs_mktid(self) -> str:
'''
Kraken seems to index it's market symbol sets in
transaction ledgers using the key returned from rest
queries.. so use that since apparently they can't
make up their minds on a better key set XD
'''
return self.respname
@property
def price_tick(self) -> Decimal:
return digits_to_dec(self.pair_decimals)
@property
def size_tick(self) -> Decimal:
return digits_to_dec(self.lot_decimals)
@property
def bs_dst_asset(self) -> str:
dst, _ = self.wsname.split('/')
return dst
@property
def bs_src_asset(self) -> str:
_, src = self.wsname.split('/')
return src
@property
def bs_fqme(self) -> str:
'''
Basically the `.altname` but with special '.' handling and
`.SPOT` suffix appending (for future multi-venue support).
'''
dst, src = self.wsname.split('/')
# XXX: omg for stupid shite like ETH2.S/ETH..
dst = dst.replace('.', '-')
return f'{dst}{src}.SPOT'