diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 18c95d58..63ab7fa0 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -37,9 +37,13 @@ import wsproto from piker import pp from piker.clearing._messages import ( - BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdCancel, + BrokerdError, BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, ) from .api import ( Client, @@ -338,6 +342,7 @@ async def trades_dialogue( token = resp['result']['token'] + ws: NoBsWs async with ( ctx.open_stream() as ems_stream, open_autorecon_ws( @@ -350,55 +355,78 @@ async def trades_dialogue( # task for processing inbound requests from ems n.start_soon(handle_order_requests, client, ems_stream) - # begin trade event processing - async for trade in process_trade_msgs( - ws, - trans, # pass in prior ledger transactions - ): - match trade: - # prepare and send a filled status update - case Trade(): - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + count: int = 0 + ledger_txids = {r.tid for r in trans} - account=acc_name, - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, + # process and relay trades events to ems + # https://docs.kraken.com/websockets/#message-ownTrades + async for msg in stream_messages(ws): + match msg: + case [ + trades_msgs, + 'ownTrades', + {'sequence': seq}, + ]: + # ensure that we are only processing new trades + assert seq > count + count += 1 - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + for entries in trades_msgs: + for tid, msg in entries.items(): - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + if tid in ledger_txids: + log.debug(f'Skipping ledgered {tid}:{msg}') + continue - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) + # yield trade + reqid = msg['ordertxid'] + action = msg['type'] + price = float(msg['price']) + size = float(msg['vol']) + broker_time = float(msg['time']) - await ems_stream.send(fill_msg.dict()) + filled_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), + + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), + + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + + await ems_stream.send(fill_msg.dict()) case _: - log.warning(f'Unhandled trades msg: {trade}') + log.warning(f'Unhandled trades msg: {msg}') await tractor.breakpoint() @@ -452,48 +480,3 @@ async def update_ledger( # normalize to transaction form records = norm_trade_records(trade_entries) return records - - -async def process_trade_msgs( - ws: NoBsWs, - trans: list[pp.Transaction], -): - ''' - Parse and pack trades subscription messages, deliver framed - sequences of messages? - - Ws api docs: - https://docs.kraken.com/websockets/#message-ownTrades - - ''' - count: int = 0 - ledger_txids = {r.tid for r in trans} - - async for msg in stream_messages(ws): - - sub = msg[1] - seq = msg[2]['sequence'] - - # stream sanity checks - assert sub == 'ownTrades' - - # ensure that we are only processing new trades - assert seq > count - count += 1 - - trade_events = msg[0] - - for trade_event in trade_events: - for tid, trade_data in trade_event.items(): - if tid in ledger_txids: - continue - - trade = Trade( - reqid=msg['ordertxid'], - action=msg['type'], - price=msg['price'], - size=msg['vol'], - broker_time=msg['time'] - ) - - yield trade