Finally get real-time pp updates workin for `kraken`

This ended up driving the rework of the `piker.pp` apis to use context
manager + table style which resulted in a much easier to follow
state/update system B). Also added is a flag to do a manual simulation
of a "fill triggered rt pp msg" which requires the user to delete the
last ledgered trade entry from config files and then allowing that trade
to emit through the `openOrders` sub and update client shortly after
order mode boot; this is how the rt updates were verified to work
without doing even more live orders 😂.

Patch details:
- open both `open_trade_ledger()` and `open_pps()` inside the trade
  dialog startup and conduct a "pp state sync" logic phase where we now
  pull the account balances and incrementally load pp data (in order,
  from `pps.toml`, ledger, api) until we can generate the asset balance
  by reverse incrementing through trade history eventually erroring out
  if we can't reproduce the balance value.
- rework the `trade2pps()` to take in the `PpTable` and generate new
  ems msgs from table updates.
- return the new `dict[str, Transaction]` expected from
  `norm_trade_records()`
- only update pp config and ledger on dialog exit.
kraken_ws_orders
Tyler Goodlet 2022-07-19 08:43:33 -04:00
parent 54008a1976
commit 3b79743c7b
1 changed files with 294 additions and 168 deletions

View File

@ -24,13 +24,14 @@ from contextlib import (
contextmanager as cm, contextmanager as cm,
) )
from functools import partial from functools import partial
import itertools
from itertools import count from itertools import count
import math
from pprint import pformat from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
# Optional,
Union, Union,
) )
@ -41,7 +42,14 @@ import trio
import tractor import tractor
import wsproto import wsproto
from piker import pp from piker.pp import (
Position,
PpTable,
Transaction,
# update_pps_conf,
open_trade_ledger,
open_pps,
)
from piker.clearing._messages import ( from piker.clearing._messages import (
BrokerdCancel, BrokerdCancel,
BrokerdError, BrokerdError,
@ -265,10 +273,56 @@ async def subscribe(
# await ws.recv_msg() # await ws.recv_msg()
def trades2pps(
table: PpTable,
acctid: str,
new_trans: dict[str, Transaction] = {},
) -> tuple[
list[BrokerdPosition],
list[Transaction],
]:
if new_trans:
updated = table.update_from_trans(
new_trans,
)
log.info(f'Updated pps:\n{pformat(updated)}')
pp_entries, closed_pp_objs = table.dump_active('kraken')
pp_objs: dict[Union[str, int], Position] = table.pps
pps: dict[int, Position]
position_msgs: list[dict] = []
for pps in [pp_objs, closed_pp_objs]:
for tid, p in pps.items():
msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account='kraken.' + acctid,
symbol=p.symbol.front_fqsn(),
size=p.size,
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg)
return position_msgs
@tractor.context @tractor.context
async def trades_dialogue( async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
loglevel: str = None, loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
@ -289,98 +343,193 @@ async def trades_dialogue(
acctid = client._name acctid = client._name
acc_name = 'kraken.' + acctid acc_name = 'kraken.' + acctid
# pull and deliver trades ledger # task local msg dialog tracking
trades = await client.get_trades() apiflows: defaultdict[
log.info( int,
f'Loaded {len(trades)} trades from account `{acc_name}`' ChainMap[dict[str, dict]],
) ] = defaultdict(ChainMap)
with open_ledger(
acctid, # 2way map for ems ids to kraken int reqids..
trades, ids: bidict[str, int] = bidict()
) as trans: reqids2txids: bidict[int, str] = bidict()
active, closed = pp.update_pps_conf(
# NOTE: testing code for making sure the rt incremental update
# of positions, via newly generated msgs works. In order to test
# this,
# - delete the *ABSOLUTE LAST* entry from accont's corresponding
# trade ledgers file (NOTE this MUST be the last record
# delivered from the
# api ledger),
# - open you ``pps.toml`` and find that same tid and delete it
# from the pp's clears table,
# - set this flag to `True`
#
# You should see an update come in after the order mode
# boots up which shows your latest correct asset
# balance size after the "previously unknown simulating a live
# fill" update comes in on the relay loop even though the fill
# will be ignored by the ems (no known reqid) the pp msg should
# update things correctly.
simulate_pp_update: bool = False
with (
open_pps(
'kraken', 'kraken',
acctid, acctid,
trade_records=trans, ) as table,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
position_msgs: list[dict] = [] open_trade_ledger(
pps: dict[int, pp.Position] 'kraken',
for pps in [active, closed]:
for tid, p in pps.items():
msg = BrokerdPosition(
broker='kraken',
account=acc_name,
symbol=p.symbol.front_fqsn(),
size=p.size,
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg)
await ctx.started(
(position_msgs, [acc_name])
)
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error')
if err:
raise BrokerError(err)
token = resp['result']['token']
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=partial(
subscribe,
token=token,
),
) as ws,
trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
):
# task local msg dialog tracking
apiflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: bidict[int, str] = bidict()
# task for processing inbound requests from ems
n.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
apiflows,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
trans,
acctid, acctid,
acc_name, ) as ledger_dict,
token, ):
# transaction-ify the ledger entries
ledger_trans = norm_trade_records(ledger_dict)
# TODO: eventually probably only load
# as far back as it seems is not deliverd in the
# most recent 50 trades and assume that by ordering we
# already have those records in the ledger.
tids2trades = await client.get_trades()
api_trans = norm_trade_records(tids2trades)
# retrieve kraken reported balances
# and do diff with ledger to determine
# what amount of trades-transactions need
# to be reloaded.
sizes = await client.get_balances()
for dst, size in sizes.items():
# we don't care about tracking positions
# in the user's source fiat currency.
if dst == client.conf['src_fiat']:
continue
def has_pp(dst: str) -> Position | bool:
pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps}
pair = pps_dst_assets.get(dst)
pp = table.pps.get(pair)
if (
not pair or not pp
or not math.isclose(pp.size, size)
):
return False
return pp
pos = has_pp(dst)
if not pos:
# we have a balance for which there is no pp
# entry? so we have to likely update from the
# ledger.
updated = table.update_from_trans(ledger_trans)
log.info(f'Updated pps from ledger:\n{pformat(updated)}')
pos = has_pp(dst)
if not pos and not simulate_pp_update:
# try reloading from API
table.update_from_trans(api_trans)
pos = has_pp(dst)
if not pos:
raise ValueError(
'Could not reproduce balance:\n'
f'dst: {dst}, {size}\n'
)
# only for simulate-testing a "new fill" since
# otherwise we have to actually conduct a live clear.
if simulate_pp_update:
tid = list(tids2trades)[0]
last_trade_dict = tids2trades[tid]
# stage a first reqid of `0`
reqids2txids[0] = last_trade_dict['ordertxid']
# reverse the volume on the last trade record so that we can
# use it to update the pptable and thus reverse the last
# trade's effect on the position size.
# last_trade_dict['vol'] = str(float(last_trade_dict['vol']) * -1)
# convert the reversed trade into transaction format
# fake_tid = ''.join(reversed(tid))
# reversed_last_tran = norm_trade_records(
# {fake_tid: last_trade_dict})[fake_tid]
# assert reversed_last_tran.size == -1 * (
# list(api_trans.values())[0].size)
# update the pp table with the reversed trade thus taking
# the sizing back to "one trade prior" to the last one.
# table.update_from_trans({tid: reversed_last_tran})
ppmsgs = trades2pps(
table,
acctid,
# new_trans,
) )
await ctx.started((ppmsgs, [acc_name]))
# XXX: not fucking clue but putting this finally block
# will suppress errors inside the direct await below!?!
# try:
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error')
if err:
raise BrokerError(err)
token = resp['result']['token']
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=partial(
subscribe,
token=token,
),
) as ws,
aclosing(stream_messages(ws)) as stream,
trio.open_nursery() as nurse,
):
stream = stream_messages(ws)
# task for processing inbound requests from ems
nurse.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
apiflows,
ids,
reqids2txids,
)
# enter relay loop
# try:
try:
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
table,
api_trans,
acctid,
acc_name,
token,
)
# except:
# await tractor.breakpoint()
finally:
# always update ledger on exit
ledger_dict.update(tids2trades)
async def handle_order_updates( async def handle_order_updates(
@ -390,7 +539,11 @@ async def handle_order_updates(
apiflows: dict[int, ChainMap[dict[str, dict]]], apiflows: dict[int, ChainMap[dict[str, dict]]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
trans: set[pp.Transaction], table: PpTable,
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
ledger_trans: dict[str, Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
token: str, token: str,
@ -403,9 +556,6 @@ async def handle_order_updates(
defined in the signature clear to the reader. defined in the signature clear to the reader.
''' '''
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: set[pp.Transaction]
async for msg in ws_stream: async for msg in ws_stream:
match msg: match msg:
@ -427,17 +577,34 @@ async def handle_order_updates(
f'ownTrades update_{seq}:\n' f'ownTrades update_{seq}:\n'
f'{pformat(trades_msgs)}' f'{pformat(trades_msgs)}'
) )
# flatten msgs to an {id -> data} table for processing # assert 0
# format as tid -> trade event map
# eg. msg
# [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047',
# 'fee': '0.24776',
# 'margin': '0.00000',
# 'ordertxid': 'OKSUXM-3OLSB-L7TN72',
# 'ordertype': 'limit',
# 'pair': 'XBT/EUR',
# 'postxid': 'TKH2SE-M7IF5-CFI7LT',
# 'price': '21268.20000',
# 'time': '1657990947.640891',
# 'type': 'buy',
# 'vol': '0.00448042'}}]
trades = { trades = {
tid: trade tid: trade
for entry in trades_msgs for entry in trades_msgs
for (tid, trade) in entry.items() for (tid, trade) in entry.items()
if tid not in ledger_trans
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
} }
for tid, trade in trades.items():
# if tid in ledger_trans:
# # skip already seen transactions
# log.info(f'Skipping already seen trade {trade}')
# continue
# await tractor.breakpoint()
for tid, trade in trades.items():
txid = trade['ordertxid'] txid = trade['ordertxid']
# NOTE: yet again, here we don't have any ref to the # NOTE: yet again, here we don't have any ref to the
@ -491,57 +658,22 @@ async def handle_order_updates(
) )
await ems_stream.send(filled_msg) await ems_stream.send(filled_msg)
if not trades: # if not trades:
# skip pp emissions if we have already # # skip pp emissions if we have already
# processed all trades in this msg. # # processed all trades in this msg.
continue # continue
# update ledger and position tracking new_trans = norm_trade_records(trades)
trans: set[pp.Transaction] ppmsgs = trades2pps(
with open_ledger( table,
acctid, acctid,
trades, new_trans,
)
) as trans: for pp_msg in ppmsgs:
# TODO: ideally we can pass in an existing
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time..
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=list(trans),
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit any new pp msgs to ems
for pos in filter(
bool,
itertools.chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
)
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg)
ledger_trans.update(new_trans)
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
case [ case [
@ -801,7 +933,7 @@ async def handle_order_updates(
): ):
# client was editting too quickly # client was editting too quickly
# so we instead cancel this order # so we instead cancel this order
print("SENDING CANCEL") log.cancel(f'Cancelling order for {reqid}@{txid}')
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
'token': token, 'token': token,
@ -910,9 +1042,10 @@ def process_status(
def norm_trade_records( def norm_trade_records(
ledger: dict[str, Any], ledger: dict[str, Any],
) -> list[pp.Transaction]: ) -> dict[str, Transaction]:
records: dict[str, Transaction] = {}
records: list[pp.Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
size = float(record.get('vol')) * { size = float(record.get('vol')) * {
@ -923,19 +1056,17 @@ def norm_trade_records(
# we normalize to kraken's `altname` always.. # we normalize to kraken's `altname` always..
bsuid = norm_sym = Client.normalize_symbol(record['pair']) bsuid = norm_sym = Client.normalize_symbol(record['pair'])
records.append( records[tid] = Transaction(
pp.Transaction( fqsn=f'{norm_sym}.kraken',
fqsn=f'{norm_sym}.kraken', tid=tid,
tid=tid, size=size,
size=size, price=float(record['price']),
price=float(record['price']), cost=float(record['fee']),
cost=float(record['fee']), dt=pendulum.from_timestamp(float(record['time'])),
dt=pendulum.from_timestamp(float(record['time'])), bsuid=bsuid,
bsuid=bsuid,
# XXX: there are no derivs on kraken right? # XXX: there are no derivs on kraken right?
# expiry=expiry, # expiry=expiry,
)
) )
return records return records
@ -946,21 +1077,16 @@ def open_ledger(
acctid: str, acctid: str,
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
) -> set[pp.Transaction]: ) -> set[Transaction]:
''' '''
Write recent session's trades to the user's (local) ledger file. Write recent session's trades to the user's (local) ledger file.
''' '''
with pp.open_trade_ledger( with open_trade_ledger(
'kraken', 'kraken',
acctid, acctid,
) as ledger: ) as ledger:
yield ledger
# normalize to transaction form
# TODO: cawt damn, we should probably delegate to cryptofeed for
# this insteada of re-hacking kraken's total crap?
records = norm_trade_records(trade_entries)
yield set(records)
# update on exit # update on exit
ledger.update(trade_entries) ledger.update(trade_entries)