From 30bcfdcc8373999149ac8258fc4d039bd3cb495e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Aug 2022 19:22:31 -0400 Subject: [PATCH] Emit fills from `openOrders` block The (partial) fills from this sub are most indicative of clears (also says support) whereas the msgs in the `ownTrades` sub are only emitted after the entire order request has completed - there is no size-vlm remaining. Further enhancements: - this also includes proper subscription-syncing inside `subscribe()` with a small pre-msg-loop which waits on ack-msgs for each sub and raises any errors. This approach should probably be implemented for the data feed streams as well. - configure the `ownTrades` sub to not bother sending historical data on startup. - make the `openOrders` sub include rate limit counters. - handle the rare case where the ems is trying to cancel an order which was just edited and hasn't yet had it's new `txid` registered. --- piker/brokers/kraken/broker.py | 139 ++++++++++++++++++++++----------- 1 file changed, 93 insertions(+), 46 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index ac22c8df..25c8c1a7 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -114,16 +114,32 @@ async def handle_order_requests( }: cancel = BrokerdCancel(**msg) reqid = ids[cancel.oid] - txid = reqids2txids[reqid] - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [txid], # should be txid from submission - }) + try: + txid = reqids2txids[reqid] + except KeyError: + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST CANCEL/EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, could not cancelling..' + ), + + ) + ) + else: + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], # should be txid from submission + }) case { 'account': 'kraken.spot' as account, @@ -235,9 +251,17 @@ async def handle_order_requests( async def subscribe( ws: wsproto.WSConnection, token: str, - subs: list[str] = [ - 'ownTrades', - 'openOrders', + subs: list[tuple[str, dict]] = [ + ('ownTrades', { + # don't send first 50 trades on startup, + # we already pull this manually from the rest endpoint. + 'snapshot': False, + },), + ('openOrders', { + # include rate limit counters + 'ratecounter': True, + },), + ], ): ''' @@ -250,12 +274,15 @@ async def subscribe( # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 assert token - for sub in subs: + subnames: set[str] = set() + + for name, sub_opts in subs: msg = { 'event': 'subscribe', 'subscription': { - 'name': sub, + 'name': name, 'token': token, + **sub_opts, } } @@ -264,7 +291,34 @@ async def subscribe( # since internally the ws methods appear to be FIFO # locked. await ws.send_msg(msg) + subnames.add(name) + # wait on subscriptionn acks + with trio.move_on_after(5): + while True: + match (msg := await ws.recv_msg()): + case { + 'event': 'subscriptionStatus', + 'status': 'subscribed', + 'subscription': sub_opts, + } as msg: + log.info( + f'Sucessful subscribe for {sub_opts}:\n' + f'{pformat(msg)}' + ) + subnames.remove(sub_opts['name']) + if not subnames: + break + + case { + 'event': 'subscriptionStatus', + 'status': 'error', + 'errorMessage': errmsg, + } as msg: + raise RuntimeError( + f'{errmsg}\n\n' + f'{pformat(msg)}' + ) yield for sub in subs: @@ -616,9 +670,10 @@ async def handle_order_updates( for (tid, trade) in entry.items() # don't re-process datums we've already seen - if tid not in ledger_trans + # if tid not in ledger_trans } for tid, trade in trades.items(): + assert tid not in ledger_trans txid = trade['ordertxid'] reqid = trade.get('userref') @@ -636,22 +691,8 @@ async def handle_order_updates( size = float(trade['vol']) broker_time = float(trade['time']) - # send a fill msg for gui update - fill_msg = BrokerdFill( - time_ns=time.time_ns(), - reqid=reqid, - - action=action, - size=size, - price=price, - - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'} | trade, - broker_time=broker_time - ) - await ems_stream.send(fill_msg) - + # TODO: we can emit this on the "closed" state in + # the `openOrders` sub-block below. status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), @@ -905,17 +946,33 @@ async def handle_order_updates( # NOTE: there is no `status` field case { 'vol_exec': vlm, + 'avg_price': price, 'userref': reqid, **rest, - }: - # TODO: emit fill msg from here? + } as msg: + ourreqid = reqids2txids.inverse[txid] assert reqid == ourreqid log.info( f'openOrders vlm={vlm} Fill for {reqid}:\n' f'{update_msg}' ) - continue + + fill_msg = BrokerdFill( + time_ns=time.time_ns(), + reqid=reqid, + + # action=action, # just use size value + # for now? + size=vlm, + price=price, + + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'} | trade, + broker_time=broker_time + ) + await ems_stream.send(fill_msg) case _: log.warning( @@ -923,17 +980,7 @@ async def handle_order_updates( f'{txid}:{order_msg}' ) - # NOTE: The only reason for this seems to be remapping - # underlying `txid` values on order "edits" which the - # `openOrders` sub doesn't seem to have any knowledge of. - # UPDATE: seems like we don't need this any more thanks to - # passing through the dialog key / reqid in the `newuserref` - # field on edit requests. - - # I'd also like to ask them which event guarantees that the - # the live order is now in the book, since these status ones - # almost seem more like request-acks then state guarantees. - # ANSWER the `openOrders` is more indicative of "liveness". + # order request status updates case { 'event': etype, 'status': status, @@ -1003,8 +1050,8 @@ async def handle_order_updates( 'reqid': reqid or 0, 'txid': [txid], }) - case _: + log.warning(f'Unhandled trades update msg: {msg}')