Initial `piker.pp` ledger support for `kraken`

No real-time update support (yet) but this is the first draft at writing
trades ledgers and `pps.toml` entries for the kraken backend.

Deatz:
- drop `pack_positions()`, no longer used.
- use `piker.pp` apis to both write a trades ledger file and update the
  `pps.toml` inside the `trades_dialogue()` endpoint startup.
- drop the weird paper engine swap over if auth can't be done, we should
  be doing something with messaging in the ems over this..
- more web API error response raising.
- pass the `pp.Transaction` set loaded from ledger into
  `process_trade_msgs()` do avoid duplicate sends of already collected
  trades msgs.
- add `norm_trade_records()` public endpoing (used by `piker.pp` api)
  and `update_ledger()` helper.
- rejig `process_trade_msgs()` to drop the weird `try:` assertion block
  and skip already-recorded-in-ledger trade msgs as well as yield *each*
  trade instead of sub-sequences.
notokeninwswrapper
Tyler Goodlet 2022-07-02 15:40:59 -04:00
parent 5d39b04552
commit f65f56ec75
2 changed files with 170 additions and 138 deletions

View File

@ -35,9 +35,7 @@ from .feed import (
)
from .broker import (
trades_dialogue,
# TODO: part of pps/ledger work
# norm_trade_records,
norm_trade_records,
)
__all__ = [

View File

@ -29,12 +29,13 @@ from typing import (
# Union,
)
import pendulum
from pydantic import BaseModel
import trio
import tractor
import wsproto
from piker.clearing._paper_engine import PaperBoi
from piker import pp
from piker.clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel,
@ -62,47 +63,11 @@ class Trade(BaseModel):
'''
reqid: str # kraken order transaction id
action: str # buy or sell
price: str # price of asset
size: str # vol of asset
price: float # price of asset
size: float # vol of asset
broker_time: str # e.g GTC, GTD
def pack_positions(
acc: str,
trades: dict
) -> list[Any]:
positions: dict[str, float] = {}
vols: dict[str, float] = {}
costs: dict[str, float] = {}
position_msgs: list[Any] = []
for trade in trades.values():
sign = -1 if trade['type'] == 'sell' else 1
pair = trade['pair']
vol = float(trade['vol'])
vols[pair] = vols.get(pair, 0) + sign * vol
costs[pair] = costs.get(pair, 0) + sign * float(trade['cost'])
positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0
for ticker, pos in positions.items():
vol = float(vols[ticker])
if not vol:
continue
norm_sym = normalize_symbol(ticker)
msg = BrokerdPosition(
broker='kraken',
account=acc,
symbol=norm_sym,
currency=norm_sym[-3:],
size=vol,
avg_price=float(pos),
)
position_msgs.append(msg.dict())
return position_msgs
async def handle_order_requests(
client: Client,
@ -317,47 +282,60 @@ async def trades_dialogue(
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# Authenticated block
async with get_client() as client:
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# await ctx.started(({}, ['paper']))
if not client._api_key:
log.error('Missing Kraken API key: Trades WS connection failed')
await ctx.started(({}, ['paper']))
raise RuntimeError(
'Missing Kraken API key in `brokers.toml`!?!?')
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
client = PaperBoi(
'kraken',
ems_stream,
_buys={},
_sells={},
_reqids={},
# TODO: load paper positions from ``positions.toml``
_positions={},
)
# TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
# auth required block
acctid = client._name
acc_name = 'kraken.' + acctid
# pull and deliver trades ledger
acc_name = 'kraken.' + client._name
trades = await client.get_trades()
log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`'
)
position_msgs = pack_positions(acc_name, trades)
await ctx.started((position_msgs, (acc_name,)))
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
position_msgs: list[dict] = []
pps: dict[int, pp.Position]
for pps in [active, closed]:
for tid, p in pps.items():
msg = BrokerdPosition(
broker='kraken',
account=acc_name,
symbol=p.symbol.front_fqsn(),
size=p.size,
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
await ctx.started(
(position_msgs, [acc_name])
)
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
# lol wtf is this..
assert resp['error'] == []
err = resp.get('error')
if err:
raise BrokerError(err)
token = resp['result']['token']
async with (
@ -373,93 +351,149 @@ async def trades_dialogue(
n.start_soon(handle_order_requests, client, ems_stream)
# begin trade event processing
async for msg in process_trade_msgs(ws):
for trade in msg:
match trade:
# prepare and send a filled status update
case Trade():
filled_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
async for trade in process_trade_msgs(
ws,
trans, # pass in prior ledger transactions
):
match trade:
# prepare and send a filled status update
case Trade():
filled_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=float(trade.size),
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
},
account=acc_name,
status='filled',
filled=float(trade.size),
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),
action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)
action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)
await ems_stream.send(fill_msg.dict())
await ems_stream.send(fill_msg.dict())
case _:
log.warning(f'Unhandled trades msg: {trade}')
await tractor.breakpoint()
case _:
log.warning(f'Unhandled trades msg: {trade}')
await tractor.breakpoint()
def norm_trade_records(
ledger: dict[str, Any],
) -> list[pp.Transaction]:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = record.get('vol') * {
'buy': 1,
'sell': -1,
}[record['type']]
bsuid = record['pair']
norm_sym = normalize_symbol(bsuid)
records.append(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=float(size),
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(record['time']),
bsuid=bsuid,
# XXX: there are no derivs on kraken right?
# expiry=expiry,
)
)
return records
async def update_ledger(
acctid: str,
trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]:
# write recent session's trades to the user's (local) ledger file.
with pp.open_trade_ledger(
'kraken',
acctid,
) as ledger:
ledger.update(trade_entries)
# normalize to transaction form
records = norm_trade_records(trade_entries)
return records
async def process_trade_msgs(
ws: NoBsWs,
trans: list[pp.Transaction],
):
'''
Parse and pack trades subscription messages, deliver framed
sequences of messages?
Ws api docs:
https://docs.kraken.com/websockets/#message-ownTrades
'''
sequence_counter = 0
count: int = 0
ledger_txids = {r.tid for r in trans}
async for msg in stream_messages(ws):
try:
# check that we are on the ownTrades stream and that msgs
# are arriving in sequence with kraken For clarification the
# kraken ws api docs for this stream:
# https://docs.kraken.com/websockets/#message-ownTrades
assert msg[1] == 'ownTrades'
assert msg[2]['sequence'] > sequence_counter
sequence_counter += 1
raw_msgs = msg[0]
trade_msgs = []
sub = msg[1]
seq = msg[2]['sequence']
# Check that we are only processing new trades
if msg[2]['sequence'] != 1:
# check if its a new order or an update msg
for trade_msg in raw_msgs:
trade = list(trade_msg.values())[0]
order_msg = Trade(
reqid=trade['ordertxid'],
action=trade['type'],
price=trade['price'],
size=trade['vol'],
broker_time=trade['time']
)
trade_msgs.append(order_msg)
# stream sanity checks
assert sub == 'ownTrades'
yield trade_msgs
# ensure that we are only processing new trades
assert seq > count
count += 1
except AssertionError:
print(f'UNHANDLED MSG: {msg}')
yield msg
trade_events = msg[0]
for trade_event in trade_events:
for tid, trade_data in trade_event.items():
if tid in ledger_txids:
continue
trade = Trade(
reqid=msg['ordertxid'],
action=msg['type'],
price=msg['price'],
size=msg['vol'],
broker_time=msg['time']
)
yield trade