diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 5cf50ade..f98d3314 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -102,7 +102,7 @@ async def handle_order_requests( 'action': 'cancel', }: cancel = BrokerdCancel(**msg) - last = emsflow[cancel.oid] + # last = emsflow[cancel.oid] reqid = ids[cancel.oid] txid = reqids2txids[reqid] @@ -148,6 +148,16 @@ async def handle_order_requests( psym = order.symbol.upper() pair = f'{psym[:3]}/{psym[3:]}' + # XXX: ACK the request **immediately** before sending + # the api side request to ensure the ems maps the oid -> + # reqid correctly! + resp = BrokerdOrderAck( + oid=order.oid, # ems order request id + reqid=reqid, # our custom int mapping + account=account, # piker account + ) + await ems_order_stream.send(resp) + # call ws api to submit the order: # https://docs.kraken.com/websockets/#message-addOrder req = { @@ -166,13 +176,6 @@ async def handle_order_requests( log.info(f'Submitting WS order request:\n{pformat(req)}') await ws.send_msg(req) - resp = BrokerdOrderAck( - oid=order.oid, # ems order request id - reqid=reqid, # our custom int mapping - account=account, # piker account - ) - await ems_order_stream.send(resp) - # placehold for sanity checking in relay loop emsflow.setdefault(order.oid, []).append(order) @@ -327,7 +330,7 @@ async def trades_dialogue( # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() - reqids2txids: dict[int, str] = {} + reqids2txids: bidict[int, str] = bidict() # task for processing inbound requests from ems n.start_soon( @@ -362,7 +365,7 @@ async def handle_order_updates( ems_stream: tractor.MsgStream, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], - reqids2txids: dict[int, str], + reqids2txids: bidict[int, str], trans: list[pp.Transaction], acctid: str, acc_name: str, @@ -384,13 +387,22 @@ async def handle_order_updates( match msg: # process and relay clearing trade events to ems # https://docs.kraken.com/websockets/#message-ownTrades + # TODO: turns out you get the fill events from the + # `openOrders` before you get this, so it might be better + # to do all fill/status/pp updates in that sub and just use + # this one for ledger syncs? + # For eg. we could take the "last 50 trades" and do a diff + # with the ledger and then only do a re-sync if something + # seems amiss? case [ trades_msgs, 'ownTrades', - # won't exist for historical values? - # 'userref': reqid, {'sequence': seq}, ]: + log.info( + f'ownTrades update_{seq}:\n' + f'{pformat(trades_msgs)}' + ) # flatten msgs to an {id -> data} table for processing trades = { tid: trade @@ -402,11 +414,13 @@ async def handle_order_updates( } for tid, trade in trades.items(): - # NOTE: try to get the requid sent in the order - # request message if posssible; it may not be - # provided since this sub also returns generic - # historical trade events. - reqid = trade.get('userref', trade['ordertxid']) + txid = trade['ordertxid'] + + # NOTE: yet again, here we don't have any ref to the + # reqid that's generated by us (as the client) and + # sent in the order request, so we have to look it + # up from our own registry... + reqid = reqids2txids.inverse[txid] action = trade['type'] price = float(trade['price']) @@ -415,16 +429,16 @@ async def handle_order_updates( # send a fill msg for gui update fill_msg = BrokerdFill( - reqid=reqid, - time_ns=time.time_ns(), + reqid=reqid, action=action, size=size, price=price, + # TODO: maybe capture more msg data # i.e fees? - broker_details={'name': 'kraken'}, + broker_details={'name': 'kraken'} | trade, broker_time=broker_time ) await ems_stream.send(fill_msg) @@ -455,6 +469,10 @@ async def handle_order_updates( # update ledger and position tracking with open_ledger(acctid, trades) as trans: + # TODO: ideally we can pass in an existingn + # pps state to this right? such that we + # don't have to do a ledger reload all the + # time.. active, closed = pp.update_pps_conf( 'kraken', acctid, @@ -499,7 +517,7 @@ async def handle_order_updates( ]: for order_msg in order_msgs: log.info( - f'Order msg update_{seq}:\n' + f'`openOrders` msg update_{seq}:\n' f'{pformat(order_msg)}' ) txid, update_msg = list(order_msg.items())[0] @@ -510,14 +528,22 @@ async def handle_order_updates( case { 'cancel_reason': 'Order replaced', 'status': status, - 'userref': reqid, - **rest, + # 'userref': reqid, # XXX: always zero bug XD + # **rest, }: + log.info( + f'Order {txid} was replaced' + ) continue case { + # XXX: lol, ws bug, this is always 0! + 'userref': _, + + # during a fill this field is **not** + # provided! but, it is always avail on + # actual status updates.. see case above. 'status': status, - 'userref': reqid, **rest, # XXX: eg. of remaining msg schema: @@ -548,17 +574,23 @@ async def handle_order_updates( }: ems_status = { 'open': 'submitted', - 'closed': 'cancelled', + 'closed': 'filled', 'canceled': 'cancelled', # do we even need to forward # this state to the ems? 'pending': 'pending', }[status] - submit_vlm = rest.get('vol', 0) - exec_vlm = rest.get('vol_exec', 0) + # TODO: store this in a ChainMap instance + # per order dialog. + # submit_vlm = rest.get('vol', 0) + # fee = rest.get('fee', 0) + if status == 'closed': + vlm = 0 + else: + vlm = rest.get('vol_exec', 0) - reqids2txids[reqid] = txid + reqid = reqids2txids.inverse[txid] oid = ids.inverse.get(reqid) if not oid: @@ -594,13 +626,17 @@ async def handle_order_updates( # everyone doin camel case.. status=ems_status, # force lower case - filled=exec_vlm, + filled=vlm, reason='', # why held? - remaining=( - float(submit_vlm) - - - float(exec_vlm) - ), + remaining=vlm, + + # TODO: need to extract the submit vlm + # from a prior msg update.. + # ( + # float(submit_vlm) + # - + # float(exec_vlm) + # ), broker_details=dict( {'name': 'kraken'}, **update_msg @@ -609,12 +645,49 @@ async def handle_order_updates( msgs.append(resp) await ems_stream.send(resp) + # fill event. + # there is no `status` field + case { + 'vol_exec': vlm, + **rest, + }: + # eg. fill msg contents (in total): + # { + # 'vol_exec': '0.84709869', + # 'cost': '101.25370642', + # 'fee': '0.26325964', + # 'avg_price': '119.53000001', + # 'userref': 0, + # } + # TODO: emit fill msg from here + reqid = reqids2txids.inverse[txid] + log.info( + f'openOrders vlm={vlm} Fill for {reqid}:\n' + f'{update_msg}' + ) + continue + case _: log.warning( 'Unknown orders msg:\n' f'{txid}:{order_msg}' ) + # TODO: given the 'openOrders' sub , pretty + # much all the msgs we get for this sub are duplicate + # of the (incremental) updates in that one though we still + # need them because that sub seems to have a bug where the + # `userref` field is always 0 instead of our generated reqid + # value... + # Not sure why kraken devs decided to repeat themselves but + # it almost seems as though we could drop this entire sub + # and get everything we need by just parsing msgs correctly + # above? The only reason for this seems to be remapping + # underlying `txid` values on order "edits" which the + # `openOrders` sub doesn't seem to have any knowledge of. + # I'd also like to ask them which event guarantees that the + # the live order is now in the book, since these status ones + # almost seem more like request-acks then state guarantees. case { 'event': etype, 'status': status, @@ -627,7 +700,13 @@ async def handle_order_updates( 'cancelOrderStatus', } ): + log.info( + f'{etype}:\n' + f'{pformat(msg)}' + ) oid = ids.inverse.get(reqid) + # TODO: relay these to EMS once it supports + # open order loading. if not oid: log.warning( 'Unknown order status update?:\n' @@ -637,6 +716,10 @@ async def handle_order_updates( txid = rest.get('txid') if txid: + # XXX: we **must** do this mapping for edit order + # status updates since the `openOrders` sub above + # never relays back the correct client-side `reqid` + # that is put in the order request.. reqids2txids[reqid] = txid msgs = emsflow[oid] @@ -703,9 +786,9 @@ def process_status( 'descr': descr, # only on success? }: log.info( - f'Submitting order: {descr}\n' + f'Submitted order: {descr}\n' f'ems oid: {oid}\n' - f're-mapped reqid: {reqid}\n' + f'brokerd reqid: {reqid}\n' f'txid: {txid}\n' ) return [], False @@ -722,6 +805,7 @@ def process_status( }: log.info( f'Editting order {oid}[requid={reqid}]:\n' + f'brokerd reqid: {reqid}\n' f'txid: {origtxid} -> {txid}\n' f'{descr}' ) @@ -737,21 +821,12 @@ def process_status( # 'txid': txids, **rest, }: - # TODO: should we support "batch" acking of - # multiple cancels thus avoiding the below loop? - resps: list[MsgUnion] = [] for txid in rest.get('txid', [last.reqid]): - resp = BrokerdStatus( - reqid=reqid, - account=last.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Cancel success: {oid}@{txid}', - broker_details=event, + log.info( + f'Cancelling order {oid}[requid={reqid}]:\n' + f'brokerd reqid: {reqid}\n' ) - resps.append(resp) - - return resps, False + return [], False def norm_trade_records(