diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 6bb3be17..0d50c078 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -741,6 +741,57 @@ async def handle_order_updates( txid, update_msg = list(order_msg.items())[0] match update_msg: + # EMS-unknown live order that needs to be + # delivered and loaded on the client-side. + case { + 'userref': reqid, + + # during a fill this field is **not** + # provided! but, it is always avail on + # actual status updates.. see case above. + 'status': status, + **rest, + } if ( + ids.inverse.get(reqid) is None + ): + # parse out existing live order + descr = rest['descr'] + fqsn = descr['pair'].replace( + '/', '').lower() + price = float(descr['price']) + size = float(rest['vol']) + action = descr['type'] + + # register the userref value from + # kraken (usually an `int` staring + # at 1?) as our reqid. + reqids2txids[reqid] = txid + oid = str(reqid) + ids[oid] = reqid # NOTE!: str -> int + + # fill out ``Status`` + boxed ``Order`` + status_msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=oid, + reqid=reqid, + + # embedded order info + req=Order( + action=action, + exec_mode='live', + oid=oid, + symbol=fqsn, + account=acc_name, + price=price, + size=size, + ), + src='kraken', + ) + apiflows[reqid].maps.append(status_msg) + await ems_stream.send(status_msg) + continue + # XXX: eg. of full msg schema: # {'avg_price': _, # 'cost': _, @@ -819,105 +870,56 @@ async def handle_order_updates( ) oid = ids.inverse.get(reqid) + # XXX: too fast edit handled by the + # request handler task: this + # scenario occurs when ems side + # requests are coming in too quickly + # such that there is no known txid + # yet established for the ems + # dialog's last reqid when the + # request handler task is already + # receceiving a new update for that + # reqid. In this case we simply mark + # the reqid as being "too fast" and + # then when we get the next txid + # update from kraken's backend, and + # thus the new txid, we simply + # cancel the order for now. + # TODO: Ideally we eventually + # instead make the client side of + # the ems block until a submission + # is confirmed by the backend + # instead of this hacky throttle + # style approach and avoid requests + # coming in too quickly on the other + # side of the ems, aka the client + # <-> ems dialog. if ( status == 'open' - and ( - # XXX: too fast edit handled by the - # request handler task: this - # scenario occurs when ems side - # requests are coming in too quickly - # such that there is no known txid - # yet established for the ems - # dialog's last reqid when the - # request handler task is already - # receceiving a new update for that - # reqid. In this case we simply mark - # the reqid as being "too fast" and - # then when we get the next txid - # update from kraken's backend, and - # thus the new txid, we simply - # cancel the order for now. - - # TODO: Ideally we eventually - # instead make the client side of - # the ems block until a submission - # is confirmed by the backend - # instead of this hacky throttle - # style approach and avoid requests - # coming in too quickly on the other - # side of the ems, aka the client - # <-> ems dialog. - (toofast := isinstance( - reqids2txids.get(reqid), - TooFastEdit - )) - - # pre-existing open order NOT from - # this EMS session. - or (noid := oid is None) + and isinstance( + reqids2txids.get(reqid), + TooFastEdit ) ): - if toofast: - # TODO: don't even allow this case - # by not moving the client side line - # until an edit confirmation - # arrives... - log.cancel( - f'Received too fast edit {txid}:\n' - f'{update_msg}\n' - 'Cancelling order for now!..' - ) - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid or 0, - 'txid': [txid], - }) - continue - - # a non-ems-active order, emit live - # order embedded in status msg. - elif noid: - # parse out existing live order - descr = rest['descr'] - fqsn = descr['pair'].replace( - '/', '').lower() - price = float(descr['price']) - size = float(rest['vol']) - action = descr['type'] - - # register the userref value from - # kraken (usually an `int` staring - # at 1?) as our reqid. - reqids2txids[reqid] = txid - oid = str(reqid) - ids[oid] = reqid # NOTE!: str -> int - - # fill out ``Status`` + boxed ``Order`` - status_msg = Status( - time_ns=time.time_ns(), - resp='open', - oid=oid, - reqid=reqid, - - # embedded order info - req=Order( - action=action, - exec_mode='live', - oid=oid, - symbol=fqsn, - account=acc_name, - price=price, - size=size, - ), - src='kraken', - ) - apiflows[reqid].maps.append(status_msg) - await ems_stream.send(status_msg) - continue + # TODO: don't even allow this case + # by not moving the client side line + # until an edit confirmation + # arrives... + log.cancel( + f'Received too fast edit {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) + continue # send BrokerdStatus messages for all # order state updates