diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index d1397e62..8a6fa84c 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -138,9 +138,13 @@ async def handle_order_requests( ep = 'editOrder' reqid = ids[order.oid] # integer not txid try: - # txid = reqids2txids.pop(reqid) txid = reqids2txids[reqid] except KeyError: + assert 0 + + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( BrokerdError( @@ -155,7 +159,7 @@ async def handle_order_requests( else: extra = { 'orderid': txid, # txid - # 'newuserref': reqid, + 'newuserref': str(reqid), } else: @@ -200,15 +204,12 @@ async def handle_order_requests( # issue was never there lmao... coorps bro. # 'userref': str(reqid), 'userref': str(reqid), - 'pair': pair, 'price': str(order.price), 'volume': str(order.size), - - # only ensures request is valid, nothing more - # validate: 'true', - + # validate: 'true', # validity check, nothing more } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') await ws.send_msg(req) @@ -229,9 +230,7 @@ async def handle_order_requests( symbol=msg['symbol'], reason=( 'Invalid request msg:\n{msg}' - ), - - ) + )) ) @@ -253,7 +252,6 @@ async def subscribe( ''' # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - assert token for sub in subs: msg = { @@ -499,16 +497,9 @@ async def trades_dialogue( ) await ctx.started((ppmsgs, [acc_name])) - # XXX: not fucking clue but putting this finally block - # will suppress errors inside the direct await below!?! - # likely something to do with the exist stack inside - # the nobsws stuff... - # try: - # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) - err = resp.get('error') if err: raise BrokerError(err) @@ -582,18 +573,37 @@ async def handle_order_updates( defined in the signature clear to the reader. ''' - async for msg in ws_stream: match msg: - # process and relay clearing trade events to ems - # https://docs.kraken.com/websockets/#message-ownTrades + # TODO: turns out you get the fill events from the # `openOrders` before you get this, so it might be better # to do all fill/status/pp updates in that sub and just use # this one for ledger syncs? + + # XXX: ASK SUPPORT ABOUT THIS! + # For eg. we could take the "last 50 trades" and do a diff # with the ledger and then only do a re-sync if something # seems amiss? + + # process and relay clearing trade events to ems + # https://docs.kraken.com/websockets/#message-ownTrades + # format as tid -> trade event map + # eg. received msg format, + # [{'TOKWHY-SMTUB-G5DOI6': { + # 'cost': '95.29047', + # 'fee': '0.24776', + # 'margin': '0.00000', + # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', + # 'ordertype': 'limit', + # 'pair': 'XBT/EUR', + # 'postxid': 'TKH2SE-M7IF5-CFI7LT', + # 'price': '21268.20000', + # 'time': '1657990947.640891', + # 'type': 'buy', + # 'vol': '0.00448042' + # }}] case [ trades_msgs, 'ownTrades', @@ -603,26 +613,6 @@ async def handle_order_updates( f'ownTrades update_{seq}:\n' f'{pformat(trades_msgs)}' ) - # XXX: a fix / todo - # see the comment in the caller about weird error - # suppression around a commented `try:` - # assert 0 - - # format as tid -> trade event map - # eg. received msg format, - # [{'TOKWHY-SMTUB-G5DOI6': { - # 'cost': '95.29047', - # 'fee': '0.24776', - # 'margin': '0.00000', - # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', - # 'ordertype': 'limit', - # 'pair': 'XBT/EUR', - # 'postxid': 'TKH2SE-M7IF5-CFI7LT', - # 'price': '21268.20000', - # 'time': '1657990947.640891', - # 'type': 'buy', - # 'vol': '0.00448042' - # }}] trades = { tid: trade for entry in trades_msgs @@ -665,7 +655,7 @@ async def handle_order_updates( ) await ems_stream.send(fill_msg) - filled_msg = BrokerdStatus( + status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), @@ -687,7 +677,7 @@ async def handle_order_updates( # https://github.com/pikers/piker/issues/296 remaining=0, ) - await ems_stream.send(filled_msg) + await ems_stream.send(status_msg) new_trans = norm_trade_records(trades) ppmsgs = trades2pps( @@ -715,22 +705,31 @@ async def handle_order_updates( txid, update_msg = list(order_msg.items())[0] match update_msg: - # we ignore internal order updates triggered by - # kraken's "edit" endpoint. - case { - 'cancel_reason': 'Order replaced', - 'status': status, - 'userref': reqid, # XXX: always zero bug XD - # **rest, - }: - # TODO: - # - put the edit order status update code here. - # - send open order status msg. - log.info( - f'Order {txid}@reqid={reqid} was replaced' - ) - continue - + # XXX: eg. of full msg schema: + # {'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm} # 0.0000 case { 'userref': reqid, @@ -739,42 +738,7 @@ async def handle_order_updates( # actual status updates.. see case above. 'status': status, **rest, - - # XXX: eg. of remaining msg schema: - # 'avg_price': _, - # 'cost': _, - # 'descr': { - # 'close': None, - # 'leverage': None, - # 'order': descr, - # 'ordertype': 'limit', - # 'pair': 'XMR/EUR', - # 'price': '74.94000000', - # 'price2': '0.00000000', - # 'type': 'buy' - # }, - # 'expiretm': None, - # 'fee': '0.00000000', - # 'limitprice': '0.00000000', - # 'misc': '', - # 'oflags': 'fciq', - # 'opentm': '1656966131.337344', - # 'refid': None, - # 'starttm': None, - # 'stopprice': '0.00000000', - # 'timeinforce': 'GTC', - # 'vol': submit_vlm, # '13.34400854', - # 'vol_exec': exec_vlm, # 0.0000 }: - ems_status = { - 'open': 'submitted', - 'closed': 'filled', - 'canceled': 'cancelled', - # do we even need to forward - # this state to the ems? - 'pending': 'pending', - }[status] - # TODO: store this in a ChainMap instance # per order dialog. # submit_vlm = rest.get('vol', 0) @@ -784,21 +748,39 @@ async def handle_order_updates( else: vlm = rest.get('vol_exec', 0) - # XXX: keep kraken engine's ``txid`` synced - # with the ems dialog's ``reqid``. - ourreqid = reqids2txids.inverse.get(txid) - if reqid > 0: + if status == 'canceled': + reqids2txids.pop(reqid) + + # we specially ignore internal order + # updates triggered by kraken's "edit" + # endpoint. + if rest['cancel_reason'] == 'Order replaced': + # TODO: + # - put the edit order status update + # code here? + # - send open order status msg. + log.info( + f'Order replaced: {txid}@reqid={reqid}' + ) + + # we don't do normal msg emission on + # a replacement cancel since it's + # the result of an "edited order" + # and thus we mask the kraken + # backend cancel then create details + # from the ems side. + continue + else: + # XXX: keep kraken engine's ``txid`` synced + # with the ems dialog's ``reqid``. + reqids2txids[reqid] = txid + + ourreqid = reqids2txids.inverse.get(txid) if ourreqid is None: log.info( 'Mapping new txid to our reqid:\n' f'{reqid} -> {txid}' ) - reqids2txids[reqid] = txid - - else: - # NOTE: if is to hack around edit order not - # realying userref field - reqid = ourreqid oid = ids.inverse.get(reqid) @@ -845,7 +827,7 @@ async def handle_order_updates( # by not moving the client side line # until an edit confirmation # arrives... - log.warning( + log.cancel( f'Received too fast edit {txid}:\n' f'{update_msg}\n' 'Cancelling order for now!..' @@ -855,7 +837,7 @@ async def handle_order_updates( # TODO: handle these and relay them # through the EMS to the client / UI # side! - log.warning( + log.cancel( f'Rx unknown active order {txid}:\n' f'{update_msg}\n' 'Cancelling order for now!..' @@ -871,6 +853,18 @@ async def handle_order_updates( }) continue + # remap statuses to ems set. + ems_status = { + 'open': 'submitted', + 'closed': 'filled', + 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? + 'pending': 'pending', + }[status] + # TODO: i like the open / closed semantics + # more we should consider them for internals + # send BrokerdStatus messages for all # order state updates resp = BrokerdStatus( @@ -902,22 +896,22 @@ async def handle_order_updates( apiflows[reqid].maps.append(update_msg) await ems_stream.send(resp) - # fill event. - # there is no `status` field + # fill msg. + # eg. contents (in total): + # { + # 'vol_exec': '0.84709869', + # 'cost': '101.25370642', + # 'fee': '0.26325964', + # 'avg_price': '119.53000001', + # 'userref': 0, + # } + # NOTE: there is no `status` field case { 'vol_exec': vlm, 'userref': reqid, **rest, }: - # eg. fill msg contents (in total): - # { - # 'vol_exec': '0.84709869', - # 'cost': '101.25370642', - # 'fee': '0.26325964', - # 'avg_price': '119.53000001', - # 'userref': 0, - # } - # TODO: emit fill msg from here + # TODO: emit fill msg from here? ourreqid = reqids2txids.inverse[txid] assert reqid == ourreqid log.info( @@ -932,18 +926,12 @@ async def handle_order_updates( f'{txid}:{order_msg}' ) - # TODO: given the 'openOrders' sub , pretty - # much all the msgs we get for this sub are duplicate - # of the (incremental) updates in that one though we still - # need them because that sub seems to have a bug where the - # `userref` field is always 0 instead of our generated reqid - # value... - # SOLVED: pass both a reqid and a userref in the init - # request msg. - # NOTE: The only reason for this seems to be remapping # underlying `txid` values on order "edits" which the # `openOrders` sub doesn't seem to have any knowledge of. + # UPDATE: seems like we don't need this any more thanks to + # passing through the dialog key / reqid in the `newuserref` + # field on edit requests. # I'd also like to ask them which event guarantees that the # the live order is now in the book, since these status ones @@ -968,7 +956,6 @@ async def handle_order_updates( txid = rest.get('txid') lasttxid = reqids2txids.get(reqid) - print(f'txids: {(txid, lasttxid)}') # TODO: relay these to EMS once it supports # open order loading. @@ -980,7 +967,6 @@ async def handle_order_updates( ) continue - # if reqid is not None: # update the msg chain chain = apiflows[reqid] chain.maps.append(event) @@ -992,24 +978,30 @@ async def handle_order_updates( chain, reqids2txids, ) + if resps: for resp in resps: await ems_stream.send(resp) - if txid or lasttxid: - if ( - isinstance(lasttxid, TooFastEdit) - or errored - ): - # client was editting too quickly - # so we instead cancel this order - log.cancel(f'Cancelling order for {reqid}@{txid}') - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid or 0, - 'txid': [txid], - }) + txid = txid or lasttxid + if ( + # errored likely on a rate limit or bad input + errored + and txid + + # we throttle too-fast-requests on the ems side + or (txid and isinstance(txid, TooFastEdit)) + ): + # client was editting too quickly + # so we instead cancel this order + log.cancel( + f'Cancelling {reqid}@{txid} due to error:\n {event}') + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -1088,7 +1080,7 @@ def process_status( # doesn't relay through the ``userref`` value.. # (hopefully kraken will fix this so we don't need this # line.) - reqids2txids[reqid] = txid + # reqids2txids[reqid] = txid # deliver another ack to update the ems-side `.reqid`. return [], False @@ -1106,8 +1098,8 @@ def process_status( f'Cancelling order {oid}[requid={reqid}]:\n' f'brokerd reqid: {reqid}\n' ) - if txid == reqids2txids[reqid]: - reqids2txids.pop(reqid) + # if txid == reqids2txids[reqid]: + # reqids2txids.pop(reqid) return [], False