diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 75e67bbd..2126d886 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -24,13 +24,14 @@ from contextlib import ( contextmanager as cm, ) from functools import partial -import itertools from itertools import count +import math from pprint import pformat import time from typing import ( Any, AsyncIterator, + # Optional, Union, ) @@ -41,7 +42,14 @@ import trio import tractor import wsproto -from piker import pp +from piker.pp import ( + Position, + PpTable, + Transaction, + # update_pps_conf, + open_trade_ledger, + open_pps, +) from piker.clearing._messages import ( BrokerdCancel, BrokerdError, @@ -265,10 +273,56 @@ async def subscribe( # await ws.recv_msg() +def trades2pps( + table: PpTable, + acctid: str, + new_trans: dict[str, Transaction] = {}, + +) -> tuple[ + list[BrokerdPosition], + list[Transaction], +]: + if new_trans: + updated = table.update_from_trans( + new_trans, + ) + log.info(f'Updated pps:\n{pformat(updated)}') + + pp_entries, closed_pp_objs = table.dump_active('kraken') + pp_objs: dict[Union[str, int], Position] = table.pps + + pps: dict[int, Position] + position_msgs: list[dict] = [] + + for pps in [pp_objs, closed_pp_objs]: + for tid, p in pps.items(): + msg = BrokerdPosition( + broker='kraken', + # XXX: ok so this is annoying, we're + # relaying an account name with the + # backend suffix prefixed but when + # reading accounts from ledgers we + # don't need it and/or it's prefixed + # in the section table.. we should + # just strip this from the message + # right since `.broker` is already + # included? + account='kraken.' + acctid, + symbol=p.symbol.front_fqsn(), + size=p.size, + avg_price=p.be_price, + currency='', + ) + position_msgs.append(msg) + + return position_msgs + + @tractor.context async def trades_dialogue( ctx: tractor.Context, loglevel: str = None, + ) -> AsyncIterator[dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging @@ -289,98 +343,193 @@ async def trades_dialogue( acctid = client._name acc_name = 'kraken.' + acctid - # pull and deliver trades ledger - trades = await client.get_trades() - log.info( - f'Loaded {len(trades)} trades from account `{acc_name}`' - ) - with open_ledger( - acctid, - trades, - ) as trans: - active, closed = pp.update_pps_conf( + # task local msg dialog tracking + apiflows: defaultdict[ + int, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) + + # 2way map for ems ids to kraken int reqids.. + ids: bidict[str, int] = bidict() + reqids2txids: bidict[int, str] = bidict() + + # NOTE: testing code for making sure the rt incremental update + # of positions, via newly generated msgs works. In order to test + # this, + # - delete the *ABSOLUTE LAST* entry from accont's corresponding + # trade ledgers file (NOTE this MUST be the last record + # delivered from the + # api ledger), + # - open you ``pps.toml`` and find that same tid and delete it + # from the pp's clears table, + # - set this flag to `True` + # + # You should see an update come in after the order mode + # boots up which shows your latest correct asset + # balance size after the "previously unknown simulating a live + # fill" update comes in on the relay loop even though the fill + # will be ignored by the ems (no known reqid) the pp msg should + # update things correctly. + simulate_pp_update: bool = False + + with ( + open_pps( 'kraken', acctid, - trade_records=trans, - ledger_reload={}.fromkeys(t.bsuid for t in trans), - ) + ) as table, - position_msgs: list[dict] = [] - pps: dict[int, pp.Position] - for pps in [active, closed]: - for tid, p in pps.items(): - msg = BrokerdPosition( - broker='kraken', - account=acc_name, - symbol=p.symbol.front_fqsn(), - size=p.size, - avg_price=p.be_price, - currency='', - ) - position_msgs.append(msg) - - await ctx.started( - (position_msgs, [acc_name]) - ) - - # Get websocket token for authenticated data stream - # Assert that a token was actually received. - resp = await client.endpoint('GetWebSocketsToken', {}) - - err = resp.get('error') - if err: - raise BrokerError(err) - - token = resp['result']['token'] - - ws: NoBsWs - async with ( - ctx.open_stream() as ems_stream, - open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=partial( - subscribe, - token=token, - ), - ) as ws, - trio.open_nursery() as n, - aclosing(stream_messages(ws)) as stream, - ): - # task local msg dialog tracking - apiflows: defaultdict[ - int, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) - - # 2way map for ems ids to kraken int reqids.. - ids: bidict[str, int] = bidict() - reqids2txids: bidict[int, str] = bidict() - - # task for processing inbound requests from ems - n.start_soon( - handle_order_requests, - ws, - client, - ems_stream, - token, - apiflows, - ids, - reqids2txids, - ) - - # enter relay loop - await handle_order_updates( - ws, - stream, - ems_stream, - apiflows, - ids, - reqids2txids, - trans, + open_trade_ledger( + 'kraken', acctid, - acc_name, - token, + ) as ledger_dict, + ): + # transaction-ify the ledger entries + ledger_trans = norm_trade_records(ledger_dict) + + # TODO: eventually probably only load + # as far back as it seems is not deliverd in the + # most recent 50 trades and assume that by ordering we + # already have those records in the ledger. + tids2trades = await client.get_trades() + api_trans = norm_trade_records(tids2trades) + + # retrieve kraken reported balances + # and do diff with ledger to determine + # what amount of trades-transactions need + # to be reloaded. + sizes = await client.get_balances() + for dst, size in sizes.items(): + # we don't care about tracking positions + # in the user's source fiat currency. + if dst == client.conf['src_fiat']: + continue + + def has_pp(dst: str) -> Position | bool: + pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} + pair = pps_dst_assets.get(dst) + pp = table.pps.get(pair) + + if ( + not pair or not pp + or not math.isclose(pp.size, size) + ): + return False + + return pp + + pos = has_pp(dst) + + if not pos: + # we have a balance for which there is no pp + # entry? so we have to likely update from the + # ledger. + updated = table.update_from_trans(ledger_trans) + log.info(f'Updated pps from ledger:\n{pformat(updated)}') + pos = has_pp(dst) + + if not pos and not simulate_pp_update: + # try reloading from API + table.update_from_trans(api_trans) + pos = has_pp(dst) + if not pos: + raise ValueError( + 'Could not reproduce balance:\n' + f'dst: {dst}, {size}\n' + ) + + # only for simulate-testing a "new fill" since + # otherwise we have to actually conduct a live clear. + if simulate_pp_update: + tid = list(tids2trades)[0] + last_trade_dict = tids2trades[tid] + # stage a first reqid of `0` + reqids2txids[0] = last_trade_dict['ordertxid'] + + # reverse the volume on the last trade record so that we can + # use it to update the pptable and thus reverse the last + # trade's effect on the position size. + # last_trade_dict['vol'] = str(float(last_trade_dict['vol']) * -1) + + # convert the reversed trade into transaction format + # fake_tid = ''.join(reversed(tid)) + # reversed_last_tran = norm_trade_records( + # {fake_tid: last_trade_dict})[fake_tid] + # assert reversed_last_tran.size == -1 * ( + # list(api_trans.values())[0].size) + + # update the pp table with the reversed trade thus taking + # the sizing back to "one trade prior" to the last one. + # table.update_from_trans({tid: reversed_last_tran}) + + ppmsgs = trades2pps( + table, + acctid, + # new_trans, ) + await ctx.started((ppmsgs, [acc_name])) + + # XXX: not fucking clue but putting this finally block + # will suppress errors inside the direct await below!?! + # try: + + # Get websocket token for authenticated data stream + # Assert that a token was actually received. + resp = await client.endpoint('GetWebSocketsToken', {}) + + err = resp.get('error') + if err: + raise BrokerError(err) + + token = resp['result']['token'] + + ws: NoBsWs + async with ( + ctx.open_stream() as ems_stream, + open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=partial( + subscribe, + token=token, + ), + ) as ws, + aclosing(stream_messages(ws)) as stream, + trio.open_nursery() as nurse, + ): + stream = stream_messages(ws) + + # task for processing inbound requests from ems + nurse.start_soon( + handle_order_requests, + ws, + client, + ems_stream, + token, + apiflows, + ids, + reqids2txids, + ) + + # enter relay loop + # try: + try: + await handle_order_updates( + ws, + stream, + ems_stream, + apiflows, + ids, + reqids2txids, + table, + api_trans, + acctid, + acc_name, + token, + ) + # except: + # await tractor.breakpoint() + finally: + # always update ledger on exit + ledger_dict.update(tids2trades) async def handle_order_updates( @@ -390,7 +539,11 @@ async def handle_order_updates( apiflows: dict[int, ChainMap[dict[str, dict]]], ids: bidict[str, int], reqids2txids: bidict[int, str], - trans: set[pp.Transaction], + table: PpTable, + + # transaction records which will be updated + # on new trade clearing events (aka order "fills") + ledger_trans: dict[str, Transaction], acctid: str, acc_name: str, token: str, @@ -403,9 +556,6 @@ async def handle_order_updates( defined in the signature clear to the reader. ''' - # transaction records which will be updated - # on new trade clearing events (aka order "fills") - trans: set[pp.Transaction] async for msg in ws_stream: match msg: @@ -427,17 +577,34 @@ async def handle_order_updates( f'ownTrades update_{seq}:\n' f'{pformat(trades_msgs)}' ) - # flatten msgs to an {id -> data} table for processing + # assert 0 + # format as tid -> trade event map + # eg. msg + # [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047', + # 'fee': '0.24776', + # 'margin': '0.00000', + # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', + # 'ordertype': 'limit', + # 'pair': 'XBT/EUR', + # 'postxid': 'TKH2SE-M7IF5-CFI7LT', + # 'price': '21268.20000', + # 'time': '1657990947.640891', + # 'type': 'buy', + # 'vol': '0.00448042'}}] trades = { tid: trade for entry in trades_msgs for (tid, trade) in entry.items() - - # only emit entries which are already not-in-ledger - if tid not in {r.tid for r in trans} + if tid not in ledger_trans } - for tid, trade in trades.items(): + # if tid in ledger_trans: + # # skip already seen transactions + # log.info(f'Skipping already seen trade {trade}') + # continue + + # await tractor.breakpoint() + for tid, trade in trades.items(): txid = trade['ordertxid'] # NOTE: yet again, here we don't have any ref to the @@ -491,57 +658,22 @@ async def handle_order_updates( ) await ems_stream.send(filled_msg) - if not trades: - # skip pp emissions if we have already - # processed all trades in this msg. - continue + # if not trades: + # # skip pp emissions if we have already + # # processed all trades in this msg. + # continue - # update ledger and position tracking - trans: set[pp.Transaction] - with open_ledger( + new_trans = norm_trade_records(trades) + ppmsgs = trades2pps( + table, acctid, - trades, - - ) as trans: - # TODO: ideally we can pass in an existing - # pps state to this right? such that we - # don't have to do a ledger reload all the - # time.. - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=list(trans), - ledger_reload={}.fromkeys( - t.bsuid for t in trans), - ) - - # emit any new pp msgs to ems - for pos in filter( - bool, - itertools.chain(active.values(), closed.values()), - ): - pp_msg = BrokerdPosition( - broker='kraken', - - # XXX: ok so this is annoying, we're - # relaying an account name with the - # backend suffix prefixed but when - # reading accounts from ledgers we - # don't need it and/or it's prefixed - # in the section table.. we should - # just strip this from the message - # right since `.broker` is already - # included? - account=f'kraken.{acctid}', - symbol=pos.symbol.front_fqsn(), - size=pos.size, - avg_price=pos.be_price, - - # TODO - # currency='' - ) + new_trans, + ) + for pp_msg in ppmsgs: await ems_stream.send(pp_msg) + ledger_trans.update(new_trans) + # process and relay order state change events # https://docs.kraken.com/websockets/#message-openOrders case [ @@ -801,7 +933,7 @@ async def handle_order_updates( ): # client was editting too quickly # so we instead cancel this order - print("SENDING CANCEL") + log.cancel(f'Cancelling order for {reqid}@{txid}') await ws.send_msg({ 'event': 'cancelOrder', 'token': token, @@ -910,9 +1042,10 @@ def process_status( def norm_trade_records( ledger: dict[str, Any], -) -> list[pp.Transaction]: +) -> dict[str, Transaction]: + + records: dict[str, Transaction] = {} - records: list[pp.Transaction] = [] for tid, record in ledger.items(): size = float(record.get('vol')) * { @@ -923,19 +1056,17 @@ def norm_trade_records( # we normalize to kraken's `altname` always.. bsuid = norm_sym = Client.normalize_symbol(record['pair']) - records.append( - pp.Transaction( - fqsn=f'{norm_sym}.kraken', - tid=tid, - size=size, - price=float(record['price']), - cost=float(record['fee']), - dt=pendulum.from_timestamp(float(record['time'])), - bsuid=bsuid, + records[tid] = Transaction( + fqsn=f'{norm_sym}.kraken', + tid=tid, + size=size, + price=float(record['price']), + cost=float(record['fee']), + dt=pendulum.from_timestamp(float(record['time'])), + bsuid=bsuid, - # XXX: there are no derivs on kraken right? - # expiry=expiry, - ) + # XXX: there are no derivs on kraken right? + # expiry=expiry, ) return records @@ -946,21 +1077,16 @@ def open_ledger( acctid: str, trade_entries: list[dict[str, Any]], -) -> set[pp.Transaction]: +) -> set[Transaction]: ''' Write recent session's trades to the user's (local) ledger file. ''' - with pp.open_trade_ledger( + with open_trade_ledger( 'kraken', acctid, ) as ledger: - - # normalize to transaction form - # TODO: cawt damn, we should probably delegate to cryptofeed for - # this insteada of re-hacking kraken's total crap? - records = norm_trade_records(trade_entries) - yield set(records) + yield ledger # update on exit ledger.update(trade_entries)