From f65f56ec75d8ddfbeb4898dccd4636b95fe27682 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Jul 2022 15:40:59 -0400 Subject: [PATCH] Initial `piker.pp` ledger support for `kraken` No real-time update support (yet) but this is the first draft at writing trades ledgers and `pps.toml` entries for the kraken backend. Deatz: - drop `pack_positions()`, no longer used. - use `piker.pp` apis to both write a trades ledger file and update the `pps.toml` inside the `trades_dialogue()` endpoint startup. - drop the weird paper engine swap over if auth can't be done, we should be doing something with messaging in the ems over this.. - more web API error response raising. - pass the `pp.Transaction` set loaded from ledger into `process_trade_msgs()` do avoid duplicate sends of already collected trades msgs. - add `norm_trade_records()` public endpoing (used by `piker.pp` api) and `update_ledger()` helper. - rejig `process_trade_msgs()` to drop the weird `try:` assertion block and skip already-recorded-in-ledger trade msgs as well as yield *each* trade instead of sub-sequences. --- piker/brokers/kraken/__init__.py | 4 +- piker/brokers/kraken/broker.py | 304 +++++++++++++++++-------------- 2 files changed, 170 insertions(+), 138 deletions(-) diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 47128f52..013da8fd 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -35,9 +35,7 @@ from .feed import ( ) from .broker import ( trades_dialogue, - - # TODO: part of pps/ledger work - # norm_trade_records, + norm_trade_records, ) __all__ = [ diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 9c98bebe..18c95d58 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -29,12 +29,13 @@ from typing import ( # Union, ) +import pendulum from pydantic import BaseModel import trio import tractor import wsproto -from piker.clearing._paper_engine import PaperBoi +from piker import pp from piker.clearing._messages import ( BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdOrderAck, BrokerdError, BrokerdCancel, @@ -62,47 +63,11 @@ class Trade(BaseModel): ''' reqid: str # kraken order transaction id action: str # buy or sell - price: str # price of asset - size: str # vol of asset + price: float # price of asset + size: float # vol of asset broker_time: str # e.g GTC, GTD -def pack_positions( - acc: str, - trades: dict - -) -> list[Any]: - positions: dict[str, float] = {} - vols: dict[str, float] = {} - costs: dict[str, float] = {} - position_msgs: list[Any] = [] - - for trade in trades.values(): - sign = -1 if trade['type'] == 'sell' else 1 - pair = trade['pair'] - vol = float(trade['vol']) - vols[pair] = vols.get(pair, 0) + sign * vol - costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) - positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 - - for ticker, pos in positions.items(): - vol = float(vols[ticker]) - if not vol: - continue - norm_sym = normalize_symbol(ticker) - msg = BrokerdPosition( - broker='kraken', - account=acc, - symbol=norm_sym, - currency=norm_sym[-3:], - size=vol, - avg_price=float(pos), - ) - position_msgs.append(msg.dict()) - - return position_msgs - - async def handle_order_requests( client: Client, @@ -317,47 +282,60 @@ async def trades_dialogue( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # Authenticated block async with get_client() as client: + + # TODO: make ems flip to paper mode via + # some returned signal if the user only wants to use + # the data feed or we return this? + # await ctx.started(({}, ['paper'])) + if not client._api_key: - log.error('Missing Kraken API key: Trades WS connection failed') - await ctx.started(({}, ['paper'])) + raise RuntimeError( + 'Missing Kraken API key in `brokers.toml`!?!?') - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - - client = PaperBoi( - 'kraken', - ems_stream, - _buys={}, - _sells={}, - - _reqids={}, - - # TODO: load paper positions from ``positions.toml`` - _positions={}, - ) - - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) + # auth required block + acctid = client._name + acc_name = 'kraken.' + acctid # pull and deliver trades ledger - acc_name = 'kraken.' + client._name trades = await client.get_trades() log.info( f'Loaded {len(trades)} trades from account `{acc_name}`' ) - position_msgs = pack_positions(acc_name, trades) - await ctx.started((position_msgs, (acc_name,))) + trans = await update_ledger(acctid, trades) + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys(t.bsuid for t in trans), + ) + + position_msgs: list[dict] = [] + pps: dict[int, pp.Position] + for pps in [active, closed]: + for tid, p in pps.items(): + msg = BrokerdPosition( + broker='kraken', + account=acc_name, + symbol=p.symbol.front_fqsn(), + size=p.size, + avg_price=p.be_price, + currency='', + ) + position_msgs.append(msg.dict()) + + await ctx.started( + (position_msgs, [acc_name]) + ) # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) - # lol wtf is this.. - assert resp['error'] == [] + err = resp.get('error') + if err: + raise BrokerError(err) + token = resp['result']['token'] async with ( @@ -373,93 +351,149 @@ async def trades_dialogue( n.start_soon(handle_order_requests, client, ems_stream) # begin trade event processing - async for msg in process_trade_msgs(ws): - for trade in msg: - match trade: - # prepare and send a filled status update - case Trade(): - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + async for trade in process_trade_msgs( + ws, + trans, # pass in prior ledger transactions + ): + match trade: + # prepare and send a filled status update + case Trade(): + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), - account=acc_name, - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, + account=acc_name, + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) - await ems_stream.send(fill_msg.dict()) + await ems_stream.send(fill_msg.dict()) - case _: - log.warning(f'Unhandled trades msg: {trade}') - await tractor.breakpoint() + case _: + log.warning(f'Unhandled trades msg: {trade}') + await tractor.breakpoint() + + +def norm_trade_records( + ledger: dict[str, Any], + +) -> list[pp.Transaction]: + + records: list[pp.Transaction] = [] + + for tid, record in ledger.items(): + + size = record.get('vol') * { + 'buy': 1, + 'sell': -1, + }[record['type']] + bsuid = record['pair'] + norm_sym = normalize_symbol(bsuid) + + records.append( + pp.Transaction( + fqsn=f'{norm_sym}.kraken', + tid=tid, + size=float(size), + price=float(record['price']), + cost=float(record['fee']), + dt=pendulum.from_timestamp(record['time']), + bsuid=bsuid, + + # XXX: there are no derivs on kraken right? + # expiry=expiry, + ) + ) + + return records + + +async def update_ledger( + acctid: str, + trade_entries: list[dict[str, Any]], + +) -> list[pp.Transaction]: + + # write recent session's trades to the user's (local) ledger file. + with pp.open_trade_ledger( + 'kraken', + acctid, + ) as ledger: + ledger.update(trade_entries) + + # normalize to transaction form + records = norm_trade_records(trade_entries) + return records async def process_trade_msgs( ws: NoBsWs, + trans: list[pp.Transaction], ): ''' Parse and pack trades subscription messages, deliver framed sequences of messages? + Ws api docs: + https://docs.kraken.com/websockets/#message-ownTrades + ''' - sequence_counter = 0 + count: int = 0 + ledger_txids = {r.tid for r in trans} + async for msg in stream_messages(ws): - try: - # check that we are on the ownTrades stream and that msgs - # are arriving in sequence with kraken For clarification the - # kraken ws api docs for this stream: - # https://docs.kraken.com/websockets/#message-ownTrades - assert msg[1] == 'ownTrades' - assert msg[2]['sequence'] > sequence_counter - sequence_counter += 1 - raw_msgs = msg[0] - trade_msgs = [] + sub = msg[1] + seq = msg[2]['sequence'] - # Check that we are only processing new trades - if msg[2]['sequence'] != 1: - # check if its a new order or an update msg - for trade_msg in raw_msgs: - trade = list(trade_msg.values())[0] - order_msg = Trade( - reqid=trade['ordertxid'], - action=trade['type'], - price=trade['price'], - size=trade['vol'], - broker_time=trade['time'] - ) - trade_msgs.append(order_msg) + # stream sanity checks + assert sub == 'ownTrades' - yield trade_msgs + # ensure that we are only processing new trades + assert seq > count + count += 1 - except AssertionError: - print(f'UNHANDLED MSG: {msg}') - yield msg + trade_events = msg[0] + + for trade_event in trade_events: + for tid, trade_data in trade_event.items(): + if tid in ledger_txids: + continue + + trade = Trade( + reqid=msg['ordertxid'], + action=msg['type'], + price=msg['price'], + size=msg['vol'], + broker_time=msg['time'] + ) + + yield trade