diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py
index 8ec19bcf..783406a4 100644
--- a/piker/brokers/kraken/__init__.py
+++ b/piker/brokers/kraken/__init__.py
@@ -25,17 +25,27 @@ Sub-modules within break into the core functionalities:
wrapping around ``ib_insync``.
'''
+from .symbols import Pair # for symcache
+# required by `.brokers`
from .api import (
get_client,
)
from .feed import (
+ # required by `.accounting`, `.data`
get_mkt_info,
- open_history_client,
+
+ # required by `.data`
open_symbol_search,
stream_quotes,
+ open_history_client,
)
from .broker import (
+ # required by `.clearing`
open_trade_dialog,
+)
+from .ledger import (
+ # required by `.accounting`
+ norm_trade,
norm_trade_records,
)
@@ -43,11 +53,13 @@ from .broker import (
__all__ = [
'get_client',
'get_mkt_info',
+ 'Pair',
'open_trade_dialog',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'norm_trade_records',
+ 'norm_trade',
]
diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py
index a82714cf..b9fb6540 100644
--- a/piker/brokers/kraken/api.py
+++ b/piker/brokers/kraken/api.py
@@ -15,12 +15,11 @@
# along with this program. If not, see .
'''
-Kraken web API wrapping.
+Core (web) API client
'''
from contextlib import asynccontextmanager as acm
from datetime import datetime
-from decimal import Decimal
import itertools
from typing import (
Any,
@@ -28,7 +27,6 @@ from typing import (
)
import time
-from bidict import bidict
import pendulum
import asks
from fuzzywuzzy import process as fuzzy
@@ -40,11 +38,11 @@ import base64
import trio
from piker import config
-from piker.data.types import Struct
from piker.data import def_iohlcv_fields
from piker.accounting._mktinfo import (
Asset,
digits_to_dec,
+ dec_digits,
)
from piker.brokers._util import (
resproc,
@@ -54,6 +52,7 @@ from piker.brokers._util import (
)
from piker.accounting import Transaction
from piker.log import get_logger
+from .symbols import Pair
log = get_logger('piker.brokers.kraken')
@@ -105,68 +104,22 @@ class InvalidKey(ValueError):
'''
-# https://www.kraken.com/features/api#get-tradable-pairs
-class Pair(Struct):
- altname: str # alternate pair name
- wsname: str # WebSocket pair name (if available)
- aclass_base: str # asset class of base component
- base: str # asset id of base component
- aclass_quote: str # asset class of quote component
- quote: str # asset id of quote component
- lot: str # volume lot size
-
- cost_decimals: int
- costmin: float
- pair_decimals: int # scaling decimal places for pair
- lot_decimals: int # scaling decimal places for volume
-
- # amount to multiply lot volume by to get currency volume
- lot_multiplier: float
-
- # array of leverage amounts available when buying
- leverage_buy: list[int]
- # array of leverage amounts available when selling
- leverage_sell: list[int]
-
- # fee schedule array in [volume, percent fee] tuples
- fees: list[tuple[int, float]]
-
- # maker fee schedule array in [volume, percent fee] tuples (if on
- # maker/taker)
- fees_maker: list[tuple[int, float]]
-
- fee_volume_currency: str # volume discount currency
- margin_call: str # margin call level
- margin_stop: str # stop-out/liquidation margin level
- ordermin: float # minimum order volume for pair
- tick_size: float # min price step size
- status: str
-
- short_position_limit: float = 0
- long_position_limit: float = float('inf')
-
- @property
- def price_tick(self) -> Decimal:
- return digits_to_dec(self.pair_decimals)
-
- @property
- def size_tick(self) -> Decimal:
- return digits_to_dec(self.lot_decimals)
-
- @property
- def bs_fqme(self) -> str:
- return f'{self.symbol}.SPOT'
-
-
class Client:
# symbol mapping from all names to the altname
- _ntable: dict[str, str] = {}
+ _altnames: dict[str, str] = {}
- # 2-way map of symbol names to their "alt names" ffs XD
- _altnames: bidict[str, str] = bidict()
+ # key-ed by kraken's own bs_mktids (like fricking "XXMRZEUR")
+ # with said keys used directly from EP responses so that ledger
+ # parsing can be easily accomplished from both trade-event-msgs
+ # and offline toml files
+ _Assets: dict[str, Asset] = {}
+ _AssetPairs: dict[str, Pair] = {}
+ # key-ed by `Pair.bs_fqme: str`, and thus used for search
+ # allowing for lookup using piker's own FQME symbology sys.
_pairs: dict[str, Pair] = {}
+ _assets: dict[str, Asset] = {}
def __init__(
self,
@@ -186,15 +139,14 @@ class Client:
self._secret = secret
self.conf: dict[str, str] = config
- self.assets: dict[str, Asset] = {}
@property
def pairs(self) -> dict[str, Pair]:
+
if self._pairs is None:
raise RuntimeError(
- "Make sure to run `cache_symbols()` on startup!"
+ "Client didn't run `.get_mkt_pairs()` on startup?!"
)
- # retreive and cache all symbols
return self._pairs
@@ -254,17 +206,29 @@ class Client:
'Balance',
{},
)
- by_bsmktid = resp['result']
+ by_bsmktid: dict[str, dict] = resp['result']
- # TODO: we need to pull out the "asset" decimals
- # data and return a `decimal.Decimal` instead here!
- # using the underlying Asset
- return {
- self._altnames[sym].lower(): float(bal)
- for sym, bal in by_bsmktid.items()
- }
+ balances: dict = {}
+ for respname, bal in by_bsmktid.items():
+ asset: Asset = self._Assets[respname]
- async def get_assets(self) -> dict[str, Asset]:
+ # TODO: which KEY should we use? it's used to index
+ # the `Account.pps: dict` ..
+ key: str = asset.name.lower()
+ # TODO: should we just return a `Decimal` here
+ # or is the rounded version ok?
+ balances[key] = round(
+ float(bal),
+ ndigits=dec_digits(asset.tx_tick)
+ )
+
+ return balances
+
+ async def get_assets(
+ self,
+ reload: bool = False,
+
+ ) -> dict[str, Asset]:
'''
Load and cache all asset infos and pack into
our native ``Asset`` struct.
@@ -282,21 +246,37 @@ class Client:
}
'''
- resp = await self._public('Assets', {})
- assets = resp['result']
+ if (
+ not self._assets
+ or reload
+ ):
+ resp = await self._public('Assets', {})
+ assets: dict[str, dict] = resp['result']
- for bs_mktid, info in assets.items():
- altname = self._altnames[bs_mktid] = info['altname']
- aclass: str = info['aclass']
+ for bs_mktid, info in assets.items():
- self.assets[bs_mktid] = Asset(
- name=altname.lower(),
- atype=f'crypto_{aclass}',
- tx_tick=digits_to_dec(info['decimals']),
- info=info,
- )
+ altname: str = info['altname']
+ aclass: str = info['aclass']
+ asset = Asset(
+ name=altname,
+ atype=f'crypto_{aclass}',
+ tx_tick=digits_to_dec(info['decimals']),
+ info=info,
+ )
+ # NOTE: yes we keep 2 sets since kraken insists on
+ # keeping 3 frickin sets bc apparently they have
+ # no sane data engineers whol all like different
+ # keys for their fricking symbology sets..
+ self._Assets[bs_mktid] = asset
+ self._assets[altname.lower()] = asset
+ self._assets[altname] = asset
- return self.assets
+ # we return the "most native" set merged with our preferred
+ # naming (which i guess is the "altname" one) since that's
+ # what the symcache loader will be storing, and we need the
+ # keys that are easiest to match against in any trade
+ # records.
+ return self._Assets | self._assets
async def get_trades(
self,
@@ -377,23 +357,26 @@ class Client:
# 'amount': '0.00300726', 'fee': '0.00001000', 'time':
# 1658347714, 'status': 'Success'}]}
+ if xfers:
+ import tractor
+ await tractor.pp()
+
trans: dict[str, Transaction] = {}
for entry in xfers:
-
# look up the normalized name and asset info
- asset_key = entry['asset']
- asset = self.assets[asset_key]
- asset_key = self._altnames[asset_key].lower()
+ asset_key: str = entry['asset']
+ asset: Asset = self._Assets[asset_key]
+ asset_key: str = asset.name.lower()
+ # asset_key: str = self._altnames[asset_key].lower()
# XXX: this is in the asset units (likely) so it isn't
# quite the same as a commisions cost necessarily..)
+ # TODO: also round this based on `Pair` cost precision info?
cost = float(entry['fee'])
-
- fqme = asset_key + '.kraken'
+ # fqme: str = asset_key + '.kraken'
tx = Transaction(
- fqme=fqme,
- sym=asset,
+ fqme=asset_key, # this must map to an entry in .assets!
tid=entry['txid'],
dt=pendulum.from_timestamp(entry['time']),
bs_mktid=f'{asset_key}{src_asset}',
@@ -408,6 +391,11 @@ class Client:
# XXX: see note above
cost=cost,
+
+ # not a trade but a withdrawal or deposit on the
+ # asset (chain) system.
+ etype='transfer',
+
)
trans[tx.tid] = tx
@@ -458,7 +446,7 @@ class Client:
# txid is a transaction id given by kraken
return await self.endpoint('CancelOrder', {"txid": reqid})
- async def pair_info(
+ async def asset_pairs(
self,
pair_patt: str | None = None,
@@ -470,64 +458,69 @@ class Client:
https://docs.kraken.com/rest/#tag/Market-Data/operation/getTradableAssetPairs
'''
- # get all pairs by default, or filter
- # to whatever pattern is provided as input.
- pairs: dict[str, str] | None = None
- if pair_patt is not None:
- pairs = {'pair': pair_patt}
+ if not self._AssetPairs:
+ # get all pairs by default, or filter
+ # to whatever pattern is provided as input.
+ req_pairs: dict[str, str] | None = None
+ if pair_patt is not None:
+ req_pairs = {'pair': pair_patt}
- resp = await self._public(
- 'AssetPairs',
- pairs,
- )
- err = resp['error']
- if err:
- raise SymbolNotFound(pair_patt)
+ resp = await self._public(
+ 'AssetPairs',
+ req_pairs,
+ )
+ err = resp['error']
+ if err:
+ raise SymbolNotFound(pair_patt)
- pairs: dict[str, Pair] = {
+ # NOTE: we key pairs by our custom defined `.bs_fqme`
+ # field since we want to offer search over this key
+ # set, callers should fill out lookup tables for
+ # kraken's bs_mktid keys to map to these keys!
+ for key, data in resp['result'].items():
+ pair = Pair(respname=key, **data)
- key: Pair(**data)
- for key, data in resp['result'].items()
- }
- # always cache so we can possibly do faster lookup
- self._pairs.update(pairs)
+ # always cache so we can possibly do faster lookup
+ self._AssetPairs[key] = pair
+
+ bs_fqme: str = pair.bs_fqme
+
+ self._pairs[bs_fqme] = pair
+
+ # register the piker pair under all monikers, a giant flat
+ # surjection of all possible (and stupid) kraken names to
+ # the FMQE style piker key.
+ self._altnames[pair.altname] = bs_fqme
+ self._altnames[pair.wsname] = bs_fqme
if pair_patt is not None:
- return next(iter(pairs.items()))[1]
+ return next(iter(self._pairs.items()))[1]
- return pairs
+ return self._AssetPairs
- async def cache_symbols(self) -> dict:
+ async def get_mkt_pairs(
+ self,
+ reload: bool = False,
+ ) -> dict:
'''
- Load all market pair info build and cache it for downstream use.
+ Load all market pair info build and cache it for downstream
+ use.
- A ``._ntable: dict[str, str]`` is available for mapping the
- websocket pair name-keys and their http endpoint API (smh)
- equivalents to the "alternative name" which is generally the one
- we actually want to use XD
+ An ``._altnames: dict[str, str]`` is available for looking
+ up the piker-native FQME style `Pair.bs_fqme: str` for any
+ input of the three (yes, it's that idiotic) available
+ key-sets that kraken frickin offers depending on the API
+ including the .altname, .wsname and the weird ass default
+ set they return in rest responses..
'''
- if not self._pairs:
- pairs = await self.pair_info()
- assert self._pairs == pairs
+ if (
+ not self._pairs
+ or reload
+ ):
+ await self.asset_pairs()
- # table of all ws and rest keys to their alt-name values.
- ntable: dict[str, str] = {}
-
- for rest_key in list(pairs.keys()):
-
- pair: Pair = pairs[rest_key]
- altname = pair.altname
- wsname = pair.wsname
- ntable[altname] = ntable[rest_key] = ntable[wsname] = altname
-
- # register the pair under all monikers, a giant flat
- # surjection of all possible names to each info obj.
- self._pairs[altname] = self._pairs[wsname] = pair
-
- self._ntable.update(ntable)
-
- return self._pairs
+ return self._AssetPairs
async def search_symbols(
self,
@@ -543,8 +536,8 @@ class Client:
'''
if not len(self._pairs):
- await self.cache_symbols()
- assert self._pairs, '`Client.cache_symbols()` was never called!?'
+ await self.get_mkt_pairs()
+ assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
matches = fuzzy.extractBests(
pattern,
@@ -632,9 +625,9 @@ class Client:
raise BrokerError(errmsg)
@classmethod
- def normalize_symbol(
+ def to_bs_fqme(
cls,
- ticker: str
+ pair_str: str
) -> tuple[str, Pair]:
'''
Normalize symbol names to to a 3x3 pair from the global
@@ -643,7 +636,7 @@ class Client:
'''
try:
- return cls._ntable[ticker]
+ return cls._altnames[pair_str.upper()]
except KeyError as ke:
raise SymbolNotFound(f'kraken has no {ke.args[0]}')
@@ -655,6 +648,9 @@ async def get_client() -> Client:
if conf:
client = Client(
conf,
+
+ # TODO: don't break these up and just do internal
+ # conf lookups instead..
name=conf['key_descr'],
api_key=conf['api_key'],
secret=conf['secret']
@@ -666,6 +662,6 @@ async def get_client() -> Client:
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets)
- await client.cache_symbols()
+ await client.get_mkt_pairs()
yield client
diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py
index 7cb59672..74bd7562 100644
--- a/piker/brokers/kraken/broker.py
+++ b/piker/brokers/kraken/broker.py
@@ -24,7 +24,6 @@ from contextlib import (
)
from functools import partial
from itertools import count
-import math
from pprint import pformat
import time
from typing import (
@@ -35,21 +34,16 @@ from typing import (
)
from bidict import bidict
-import pendulum
import trio
import tractor
from piker.accounting import (
Position,
- PpTable,
+ Account,
Transaction,
TransactionLedger,
open_trade_ledger,
- open_pps,
- get_likely_pair,
-)
-from piker.accounting._mktinfo import (
- MktPair,
+ open_account,
)
from piker.clearing import(
OrderDialogs,
@@ -65,18 +59,24 @@ from piker.clearing._messages import (
BrokerdPosition,
BrokerdStatus,
)
+from piker.brokers import (
+ open_cached_client,
+)
+from piker.data import open_symcache
from .api import (
log,
Client,
BrokerError,
- get_client,
)
from .feed import (
- get_mkt_info,
open_autorecon_ws,
NoBsWs,
stream_messages,
)
+from .ledger import (
+ norm_trade_records,
+ verify_balances,
+)
MsgUnion = Union[
BrokerdCancel,
@@ -371,7 +371,8 @@ async def subscribe(
def trades2pps(
- table: PpTable,
+ acnt: Account,
+ ledger: TransactionLedger,
acctid: str,
new_trans: dict[str, Transaction] = {},
@@ -379,13 +380,14 @@ def trades2pps(
) -> list[BrokerdPosition]:
if new_trans:
- updated = table.update_from_trans(
+ updated = acnt.update_from_ledger(
new_trans,
+ symcache=ledger.symcache,
)
log.info(f'Updated pps:\n{pformat(updated)}')
- pp_entries, closed_pp_objs = table.dump_active()
- pp_objs: dict[Union[str, int], Position] = table.pps
+ pp_entries, closed_pp_objs = acnt.dump_active()
+ pp_objs: dict[Union[str, int], Position] = acnt.pps
pps: dict[int, Position]
position_msgs: list[dict] = []
@@ -399,7 +401,7 @@ def trades2pps(
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
- # in the section table.. we should
+ # in the section acnt.. we should
# just strip this from the message
# right since `.broker` is already
# included?
@@ -416,7 +418,7 @@ def trades2pps(
# as little as possible. we need to either do
# these writes in another actor, or try out `trio`'s
# async file IO api?
- table.write_config()
+ acnt.write_config()
return position_msgs
@@ -427,7 +429,12 @@ async def open_trade_dialog(
) -> AsyncIterator[dict[str, Any]]:
- async with get_client() as client:
+ async with (
+ # TODO: maybe bind these together and deliver
+ # a tuple from `.open_cached_client()`?
+ open_cached_client('kraken') as client,
+ open_symcache('kraken') as symcache,
+ ):
# make ems flip to paper mode when no creds setup in
# `brokers.toml` B0
if not client._api_key:
@@ -457,8 +464,8 @@ async def open_trade_dialog(
# - delete the *ABSOLUTE LAST* entry from account's corresponding
# trade ledgers file (NOTE this MUST be the last record
# delivered from the api ledger),
- # - open you ``pps.toml`` and find that same tid and delete it
- # from the pp's clears table,
+ # - open you ``account.kraken.spot.toml`` and find that
+ # same tid and delete it from the pos's clears table,
# - set this flag to `True`
#
# You should see an update come in after the order mode
@@ -469,172 +476,83 @@ async def open_trade_dialog(
# update things correctly.
simulate_pp_update: bool = False
- table: PpTable
+ acnt: Account
ledger: TransactionLedger
with (
- open_pps(
+ open_account(
'kraken',
acctid,
write_on_exit=True,
- ) as table,
+ ) as acnt,
open_trade_ledger(
'kraken',
acctid,
+ symcache=symcache,
) as ledger,
):
- # transaction-ify the ledger entries
- ledger_trans = await norm_trade_records(ledger)
+ # TODO: loading ledger entries should all be done
+ # within a newly implemented `async with open_account()
+ # as acnt` where `Account.ledger: TransactionLedger`
+ # can be used to explicitily update and write the
+ # offline TOML files!
+ # ------ - ------
+ # MOL the init sequence is:
+ # - get `Account` (with presumed pre-loaded ledger done
+ # beind the scenes as part of ctx enter).
+ # - pull new trades from API, update the ledger with
+ # normalized to `Transaction` entries of those
+ # records, presumably (and implicitly) update the
+ # acnt state including expiries, positions,
+ # transfers..), and finally of course existing
+ # per-asset balances.
+ # - validate all pos and balances ensuring there's
+ # no seemingly noticeable discrepancies?
- if not table.pps:
- # NOTE: we can't use this since it first needs
- # broker: str input support!
- # table.update_from_trans(ledger.to_trans())
- table.update_from_trans(ledger_trans)
- table.write_config()
+ # LOAD and transaction-ify the EXISTING LEDGER
+ ledger_trans: dict[str, Transaction] = await norm_trade_records(
+ ledger,
+ client,
+ )
+
+ if not acnt.pps:
+ acnt.update_from_ledger(
+ ledger_trans,
+ symcache=ledger.symcache,
+ )
+ acnt.write_config()
# TODO: eventually probably only load
# as far back as it seems is not deliverd in the
# most recent 50 trades and assume that by ordering we
- # already have those records in the ledger.
- tids2trades = await client.get_trades()
+ # already have those records in the ledger?
+ tids2trades: dict[str, dict] = await client.get_trades()
ledger.update(tids2trades)
if tids2trades:
ledger.write_config()
- api_trans = await norm_trade_records(tids2trades)
+ api_trans: dict[str, Transaction] = await norm_trade_records(
+ tids2trades,
+ client,
+ )
# retrieve kraken reported balances
# and do diff with ledger to determine
# what amount of trades-transactions need
# to be reloaded.
- balances = await client.get_balances()
+ balances: dict[str, float] = await client.get_balances()
- for dst, size in balances.items():
+ verify_balances(
+ acnt,
+ src_fiat,
+ balances,
+ client,
+ ledger,
+ ledger_trans,
+ api_trans,
+ )
- # we don't care about tracking positions
- # in the user's source fiat currency.
- if (
- dst == src_fiat
- or not any(
- dst in bs_mktid for bs_mktid in table.pps
- )
- ):
- log.warning(
- f'Skipping balance `{dst}`:{size} for position calcs!'
- )
- continue
-
- def has_pp(
- dst: str,
- size: float,
-
- ) -> Position | None:
-
- src2dst: dict[str, str] = {}
-
- for bs_mktid in table.pps:
- likely_pair = get_likely_pair(
- src_fiat,
- dst,
- bs_mktid,
- )
- if likely_pair:
- src2dst[src_fiat] = dst
-
- for src, dst in src2dst.items():
- pair = f'{dst}{src_fiat}'
- pp = table.pps.get(pair)
- if (
- pp
- and math.isclose(pp.size, size)
- ):
- return pp
-
- elif (
- size == 0
- and pp.size
- ):
- log.warning(
- f'`kraken` account says you have a ZERO '
- f'balance for {bs_mktid}:{pair}\n'
- f'but piker seems to think `{pp.size}`\n'
- 'This is likely a discrepancy in piker '
- 'accounting if the above number is'
- "large,' though it's likely to due lack"
- "f tracking xfers fees.."
- )
- return pp
-
- return None # signal no entry
-
- pos = has_pp(dst, size)
- if not pos:
-
- # we have a balance for which there is no pp
- # entry? so we have to likely update from the
- # ledger.
- updated = table.update_from_trans(ledger_trans)
- log.info(f'Updated pps from ledger:\n{pformat(updated)}')
- pos = has_pp(dst, size)
-
- if (
- not pos
- and not simulate_pp_update
- ):
- # try reloading from API
- table.update_from_trans(api_trans)
- pos = has_pp(dst, size)
- if not pos:
-
- # get transfers to make sense of abs balances.
- # NOTE: we do this after ledger and API
- # loading since we might not have an entry
- # in the ``pps.toml`` for the necessary pair
- # yet and thus this likely pair grabber will
- # likely fail.
- for bs_mktid in table.pps:
- likely_pair = get_likely_pair(
- src_fiat,
- dst,
- bs_mktid,
- )
- if likely_pair:
- break
- else:
- raise ValueError(
- 'Could not find a position pair in '
- 'ledger for likely widthdrawal '
- f'candidate: {dst}'
- )
-
- if likely_pair:
- # this was likely pp that had a withdrawal
- # from the dst asset out of the account.
-
- xfer_trans = await client.get_xfers(
- dst,
- # TODO: not all src assets are
- # 3 chars long...
- src_asset=likely_pair[3:],
- )
- if xfer_trans:
- updated = table.update_from_trans(
- xfer_trans,
- cost_scalar=1,
- )
- log.info(
- f'Updated {dst} from transfers:\n'
- f'{pformat(updated)}'
- )
-
- if has_pp(dst, size):
- raise ValueError(
- 'Could not reproduce balance:\n'
- f'dst: {dst}, {size}\n'
- )
-
- # only for simulate-testing a "new fill" since
+ # XXX NOTE: only for simulate-testing a "new fill" since
# otherwise we have to actually conduct a live clear.
if simulate_pp_update:
tid = list(tids2trades)[0]
@@ -643,25 +561,27 @@ async def open_trade_dialog(
reqids2txids[0] = last_trade_dict['ordertxid']
ppmsgs: list[BrokerdPosition] = trades2pps(
- table,
+ acnt,
+ ledger,
acctid,
)
+ # sync with EMS delivering pps and accounts
await ctx.started((ppmsgs, [acc_name]))
# TODO: ideally this blocks the this task
# as little as possible. we need to either do
# these writes in another actor, or try out `trio`'s
# async file IO api?
- table.write_config()
+ acnt.write_config()
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
- err = resp.get('error')
- if err:
+ if err := resp.get('error'):
raise BrokerError(err)
- token = resp['result']['token']
+ # resp token for ws init
+ token: str = resp['result']['token']
ws: NoBsWs
async with (
@@ -690,13 +610,14 @@ async def open_trade_dialog(
# enter relay loop
await handle_order_updates(
+ client,
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
- table,
+ acnt,
api_trans,
acctid,
acc_name,
@@ -705,13 +626,14 @@ async def open_trade_dialog(
async def handle_order_updates(
+ client: Client, # only for pairs table needed in ledger proc
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: bidict[int, str],
- table: PpTable,
+ acnt: Account,
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
@@ -733,7 +655,7 @@ async def handle_order_updates(
# TODO: turns out you get the fill events from the
# `openOrders` before you get this, so it might be better
- # to do all fill/status/pp updates in that sub and just use
+ # to do all fill/status/pos updates in that sub and just use
# this one for ledger syncs?
# For eg. we could take the "last 50 trades" and do a diff
@@ -818,9 +740,12 @@ async def handle_order_updates(
)
await ems_stream.send(status_msg)
- new_trans = await norm_trade_records(trades)
+ new_trans = await norm_trade_records(
+ trades,
+ client,
+ )
ppmsgs = trades2pps(
- table,
+ acnt,
acctid,
new_trans,
)
@@ -1183,36 +1108,3 @@ async def handle_order_updates(
})
case _:
log.warning(f'Unhandled trades update msg: {msg}')
-
-
-async def norm_trade_records(
- ledger: dict[str, Any],
-
-) -> dict[str, Transaction]:
-
- records: dict[str, Transaction] = {}
-
- for tid, record in ledger.items():
-
- size = float(record.get('vol')) * {
- 'buy': 1,
- 'sell': -1,
- }[record['type']]
-
- # we normalize to kraken's `altname` always..
- bs_mktid: str = Client.normalize_symbol(record['pair'])
- fqme = f'{bs_mktid.lower()}.kraken'
- mkt: MktPair = (await get_mkt_info(fqme))[0]
-
- records[tid] = Transaction(
- fqme=fqme,
- sym=mkt,
- tid=tid,
- size=size,
- price=float(record['price']),
- cost=float(record['fee']),
- dt=pendulum.from_timestamp(float(record['time'])),
- bs_mktid=bs_mktid,
- )
-
- return records
diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py
index d0b14f33..1d10ad8c 100644
--- a/piker/brokers/kraken/feed.py
+++ b/piker/brokers/kraken/feed.py
@@ -282,11 +282,13 @@ async def get_mkt_info(
'''
venue: str = 'spot'
expiry: str = ''
- if '.kraken' in fqme:
- broker, pair, venue, expiry = unpack_fqme(fqme)
- venue: str = venue or 'spot'
+ if '.kraken' not in fqme:
+ fqme += '.kraken'
- if venue != 'spot':
+ broker, pair, venue, expiry = unpack_fqme(fqme)
+ venue: str = venue or 'spot'
+
+ if venue.lower() != 'spot':
raise SymbolNotFound(
'kraken only supports spot markets right now!\n'
f'{fqme}\n'
@@ -295,14 +297,20 @@ async def get_mkt_info(
async with open_cached_client('kraken') as client:
# uppercase since kraken bs_mktid is always upper
- bs_fqme, _, broker = fqme.partition('.')
- pair_str: str = bs_fqme.upper()
- bs_mktid: str = Client.normalize_symbol(pair_str)
- pair: Pair = await client.pair_info(pair_str)
+ # bs_fqme, _, broker = fqme.partition('.')
+ # pair_str: str = bs_fqme.upper()
+ pair_str: str = f'{pair}.{venue}'
- assets = client.assets
- dst_asset: Asset = assets[pair.base]
- src_asset: Asset = assets[pair.quote]
+ pair: Pair | None = client._pairs.get(pair_str.upper())
+ if not pair:
+ bs_fqme: str = Client.to_bs_fqme(pair_str)
+ pair: Pair = client._pairs[bs_fqme]
+
+ if not (assets := client._assets):
+ assets: dict[str, Asset] = await client.get_assets()
+
+ dst_asset: Asset = assets[pair.bs_dst_asset]
+ src_asset: Asset = assets[pair.bs_src_asset]
mkt = MktPair(
dst=dst_asset,
@@ -310,7 +318,7 @@ async def get_mkt_info(
price_tick=pair.price_tick,
size_tick=pair.size_tick,
- bs_mktid=bs_mktid,
+ bs_mktid=pair.bs_mktid,
expiry=expiry,
venue=venue or 'spot',
@@ -488,7 +496,7 @@ async def open_symbol_search(
async with open_cached_client('kraken') as client:
# load all symbols locally for fast search
- cache = await client.cache_symbols()
+ cache = await client.get_mkt_pairs()
await ctx.started(cache)
async with ctx.open_stream() as stream:
@@ -497,7 +505,7 @@ async def open_symbol_search(
matches = fuzzy.extractBests(
pattern,
- cache,
+ client._pairs,
score_cutoff=50,
)
# repack in dict form
diff --git a/piker/brokers/kraken/ledger.py b/piker/brokers/kraken/ledger.py
new file mode 100644
index 00000000..2dac90d9
--- /dev/null
+++ b/piker/brokers/kraken/ledger.py
@@ -0,0 +1,252 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Trade transaction accounting and normalization.
+
+'''
+import math
+from pprint import pformat
+from typing import (
+ Any,
+)
+
+import pendulum
+
+from piker.accounting import (
+ Transaction,
+ Position,
+ Account,
+ get_likely_pair,
+ TransactionLedger,
+ # MktPair,
+)
+from piker.data import (
+ # SymbologyCache,
+ Struct,
+)
+from .api import (
+ log,
+ Client,
+ Pair,
+)
+# from .feed import get_mkt_info
+
+
+def norm_trade(
+ tid: str,
+ record: dict[str, Any],
+
+ # this is the dict that was returned from
+ # `Client.get_mkt_pairs()` and when running offline ledger
+ # processing from `.accounting`, this will be the table loaded
+ # into `SymbologyCache.pairs`.
+ pairs: dict[str, Struct],
+
+) -> Transaction:
+
+ size: float = float(record.get('vol')) * {
+ 'buy': 1,
+ 'sell': -1,
+ }[record['type']]
+
+ rest_pair_key: str = record['pair']
+ pair: Pair = pairs[rest_pair_key]
+
+ fqme: str = pair.bs_fqme.lower() + '.kraken'
+
+ return Transaction(
+ fqme=fqme,
+ tid=tid,
+ size=size,
+ price=float(record['price']),
+ cost=float(record['fee']),
+ dt=pendulum.from_timestamp(float(record['time'])),
+ bs_mktid=pair.bs_mktid,
+ )
+
+
+async def norm_trade_records(
+ ledger: dict[str, Any],
+ client: Client,
+
+) -> dict[str, Transaction]:
+ '''
+ Loop through an input ``dict`` of trade records
+ and convert them to ``Transactions``.
+
+ '''
+ records: dict[str, Transaction] = {}
+ for tid, record in ledger.items():
+
+ # manual_fqme: str = f'{bs_mktid.lower()}.kraken'
+ # mkt: MktPair = (await get_mkt_info(manual_fqme))[0]
+ # fqme: str = mkt.fqme
+ # assert fqme == manual_fqme
+
+ records[tid] = norm_trade(
+ tid,
+ record,
+ pairs=client._AssetPairs,
+ )
+
+ return records
+
+
+def has_pp(
+ acnt: Account,
+ src_fiat: str,
+ dst: str,
+ size: float,
+
+) -> Position | None:
+
+ src2dst: dict[str, str] = {}
+ for bs_mktid in acnt.pps:
+ likely_pair = get_likely_pair(
+ src_fiat,
+ dst,
+ bs_mktid,
+ )
+ if likely_pair:
+ src2dst[src_fiat] = dst
+
+ for src, dst in src2dst.items():
+ pair: str = f'{dst}{src_fiat}'
+ pos: Position = acnt.pps.get(pair)
+ if (
+ pos
+ and math.isclose(pos.size, size)
+ ):
+ return pos
+
+ elif (
+ size == 0
+ and pos.size
+ ):
+ log.warning(
+ f'`kraken` account says you have a ZERO '
+ f'balance for {bs_mktid}:{pair}\n'
+ f'but piker seems to think `{pos.size}`\n'
+ 'This is likely a discrepancy in piker '
+ 'accounting if the above number is'
+ "large,' though it's likely to due lack"
+ "f tracking xfers fees.."
+ )
+ return pos
+
+ return None # indicate no entry found
+
+
+# TODO: factor most of this "account updating from txns" into the
+# the `Account` impl so has to provide for hiding the mostly
+# cross-provider updates from txn sets
+async def verify_balances(
+ acnt: Account,
+ src_fiat: str,
+ balances: dict[str, float],
+ client: Client,
+ ledger: TransactionLedger,
+ ledger_trans: dict[str, Transaction], # from toml
+ api_trans: dict[str, Transaction], # from API
+
+ simulate_pp_update: bool = False,
+
+) -> None:
+ for dst, size in balances.items():
+
+ # we don't care about tracking positions
+ # in the user's source fiat currency.
+ if (
+ dst == src_fiat
+ or not any(
+ dst in bs_mktid for bs_mktid in acnt.pps
+ )
+ ):
+ log.warning(
+ f'Skipping balance `{dst}`:{size} for position calcs!'
+ )
+ continue
+
+ # we have a balance for which there is no pos entry
+ # - we have to likely update from the ledger?
+ if not has_pp(acnt, src_fiat, dst, size):
+ updated = acnt.update_from_ledger(
+ ledger_trans,
+ symcache=ledger.symcache,
+ )
+ log.info(f'Updated pps from ledger:\n{pformat(updated)}')
+
+ # FIRST try reloading from API records
+ if (
+ not has_pp(acnt, src_fiat, dst, size)
+ and not simulate_pp_update
+ ):
+ acnt.update_from_ledger(
+ api_trans,
+ symcache=ledger.symcache,
+ )
+
+ # get transfers to make sense of abs
+ # balances.
+ # NOTE: we do this after ledger and API
+ # loading since we might not have an
+ # entry in the
+ # ``account.kraken.spot.toml`` for the
+ # necessary pair yet and thus this
+ # likely pair grabber will likely fail.
+ if not has_pp(acnt, src_fiat, dst, size):
+ for bs_mktid in acnt.pps:
+ likely_pair: str | None = get_likely_pair(
+ src_fiat,
+ dst,
+ bs_mktid,
+ )
+ if likely_pair:
+ break
+ else:
+ raise ValueError(
+ 'Could not find a position pair in '
+ 'ledger for likely widthdrawal '
+ f'candidate: {dst}'
+ )
+
+ # this was likely pos that had a withdrawal
+ # from the dst asset out of the account.
+ if likely_pair:
+ xfer_trans = await client.get_xfers(
+ dst,
+
+ # TODO: not all src assets are
+ # 3 chars long...
+ src_asset=likely_pair[3:],
+ )
+ if xfer_trans:
+ updated = acnt.update_from_ledger(
+ xfer_trans,
+ cost_scalar=1,
+ symcache=ledger.symcache,
+ )
+ log.info(
+ f'Updated {dst} from transfers:\n'
+ f'{pformat(updated)}'
+ )
+
+ if has_pp(acnt, src_fiat, dst, size):
+ raise ValueError(
+ 'Could not reproduce balance:\n'
+ f'dst: {dst}, {size}\n'
+ )
diff --git a/piker/brokers/kraken/symbols.py b/piker/brokers/kraken/symbols.py
new file mode 100644
index 00000000..43efcac2
--- /dev/null
+++ b/piker/brokers/kraken/symbols.py
@@ -0,0 +1,114 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Symbology defs and deats!
+
+'''
+from decimal import Decimal
+
+from piker.accounting._mktinfo import (
+ digits_to_dec,
+)
+from piker.data.types import Struct
+
+
+# https://www.kraken.com/features/api#get-tradable-pairs
+class Pair(Struct):
+ respname: str # idiotic bs_mktid equiv i guess?
+ altname: str # alternate pair name
+ wsname: str # WebSocket pair name (if available)
+ aclass_base: str # asset class of base component
+ base: str # asset id of base component
+ aclass_quote: str # asset class of quote component
+ quote: str # asset id of quote component
+ lot: str # volume lot size
+
+ cost_decimals: int
+ costmin: float
+ pair_decimals: int # scaling decimal places for pair
+ lot_decimals: int # scaling decimal places for volume
+
+ # amount to multiply lot volume by to get currency volume
+ lot_multiplier: float
+
+ # array of leverage amounts available when buying
+ leverage_buy: list[int]
+ # array of leverage amounts available when selling
+ leverage_sell: list[int]
+
+ # fee schedule array in [volume, percent fee] tuples
+ fees: list[tuple[int, float]]
+
+ # maker fee schedule array in [volume, percent fee] tuples (if on
+ # maker/taker)
+ fees_maker: list[tuple[int, float]]
+
+ fee_volume_currency: str # volume discount currency
+ margin_call: str # margin call level
+ margin_stop: str # stop-out/liquidation margin level
+ ordermin: float # minimum order volume for pair
+ tick_size: float # min price step size
+ status: str
+
+ short_position_limit: float = 0
+ long_position_limit: float = float('inf')
+
+ # TODO: should we make this a literal NamespacePath ref?
+ ns_path: str = 'piker.brokers.kraken:Pair'
+
+ @property
+ def bs_mktid(self) -> str:
+ '''
+ Kraken seems to index it's market symbol sets in
+ transaction ledgers using the key returned from rest
+ queries.. so use that since apparently they can't
+ make up their minds on a better key set XD
+
+ '''
+ return self.respname
+
+ @property
+ def price_tick(self) -> Decimal:
+ return digits_to_dec(self.pair_decimals)
+
+ @property
+ def size_tick(self) -> Decimal:
+ return digits_to_dec(self.lot_decimals)
+
+ @property
+ def bs_dst_asset(self) -> str:
+ dst, _ = self.wsname.split('/')
+ return dst
+
+ @property
+ def bs_src_asset(self) -> str:
+ _, src = self.wsname.split('/')
+ return src
+
+ @property
+ def bs_fqme(self) -> str:
+ '''
+ Basically the `.altname` but with special '.' handling and
+ `.SPOT` suffix appending (for future multi-venue support).
+
+ '''
+ dst, src = self.wsname.split('/')
+ # XXX: omg for stupid shite like ETH2.S/ETH..
+ dst = dst.replace('.', '-')
+ return f'{dst}{src}.SPOT'
+
+