From f1192dff094c935fe8e23ed70714fe5e5108e420 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 11:48:10 -0400 Subject: [PATCH] Factor msg loop into new func: `handle_order_updates()` --- piker/brokers/kraken/broker.py | 674 +++++++++++++++++---------------- 1 file changed, 351 insertions(+), 323 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index cdea73fc..d694a8f7 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -329,355 +329,383 @@ async def trades_dialogue( ids, ) - # process and relay trades events to ems + # enter relay loop + await handle_order_updates( + ws, + ems_stream, + emsflow, + ids, + trans, + acctid, + acc_name, + token, + ) + + +async def handle_order_updates( + ws: NoBsWs, + ems_stream: tractor.MsgStream, + emsflow: dict[str, list[MsgUnion]], + ids: bidict[str, int], + trans: list[pp.Transaction], + acctid: str, + acc_name: str, + token: str, + +) -> None: + ''' + Main msg handling loop for all things order management. + + This code is broken out to make the context explicit and state variables + defined in the signature clear to the reader. + + ''' + async for msg in stream_messages(ws): + match msg: + # process and relay clearing trade events to ems # https://docs.kraken.com/websockets/#message-ownTrades - async for msg in stream_messages(ws): - match msg: - case [ - trades_msgs, - 'ownTrades', - {'sequence': seq}, - ]: - # flatten msgs for processing - trades = { - tid: trade - for entry in trades_msgs - for (tid, trade) in entry.items() + case [ + trades_msgs, + 'ownTrades', + {'sequence': seq}, + ]: + # flatten msgs for processing + trades = { + tid: trade + for entry in trades_msgs + for (tid, trade) in entry.items() - # only emit entries which are already not-in-ledger - if tid not in {r.tid for r in trans} - } - for tid, trade in trades.items(): + # only emit entries which are already not-in-ledger + if tid not in {r.tid for r in trans} + } + for tid, trade in trades.items(): - # parse-cast - reqid = trade['ordertxid'] - action = trade['type'] - price = float(trade['price']) - size = float(trade['vol']) - broker_time = float(trade['time']) + # parse-cast + reqid = trade['ordertxid'] + action = trade['type'] + price = float(trade['price']) + size = float(trade['vol']) + broker_time = float(trade['time']) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), - action=action, - size=size, - price=price, - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'}, - broker_time=broker_time - ) - await ems_stream.send(fill_msg.dict()) + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + await ems_stream.send(fill_msg.dict()) - filled_msg = BrokerdStatus( - reqid=reqid, - time_ns=time.time_ns(), + filled_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), - account=acc_name, - status='filled', - filled=size, - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': broker_time - }, + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) - # update ledger and position tracking - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys( - t.bsuid for t in trans), - ) + # update ledger and position tracking + trans = await update_ledger(acctid, trades) + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys( + t.bsuid for t in trans), + ) - # emit pp msgs - for pos in filter( - bool, - chain(active.values(), closed.values()), - ): - pp_msg = BrokerdPosition( - broker='kraken', + # emit any new pp msgs to ems + for pos in filter( + bool, + chain(active.values(), closed.values()), + ): + pp_msg = BrokerdPosition( + broker='kraken', - # XXX: ok so this is annoying, we're - # relaying an account name with the - # backend suffix prefixed but when - # reading accounts from ledgers we - # don't need it and/or it's prefixed - # in the section table.. we should - # just strip this from the message - # right since `.broker` is already - # included? - account=f'kraken.{acctid}', - symbol=pos.symbol.front_fqsn(), - size=pos.size, - avg_price=pos.be_price, + # XXX: ok so this is annoying, we're + # relaying an account name with the + # backend suffix prefixed but when + # reading accounts from ledgers we + # don't need it and/or it's prefixed + # in the section table.. we should + # just strip this from the message + # right since `.broker` is already + # included? + account=f'kraken.{acctid}', + symbol=pos.symbol.front_fqsn(), + size=pos.size, + avg_price=pos.be_price, - # TODO - # currency='' - ) - await ems_stream.send(pp_msg.dict()) + # TODO + # currency='' + ) + await ems_stream.send(pp_msg.dict()) - case [ - order_msgs, - 'openOrders', - {'sequence': seq}, - ]: - # TODO: async order update handling which we - # should remove from `handle_order_requests()` - # above: - # https://github.com/pikers/piker/issues/293 - # https://github.com/pikers/piker/issues/310 - for order_msg in order_msgs: - log.info( - 'Order msg update_{seq}:\n' - f'{pformat(order_msg)}' - ) - txid, update_msg = list(order_msg.items())[0] - match update_msg: - case { - 'cancel_reason': 'Order replaced', - 'status': status, - 'userref': reqid, - **rest, - }: - # we ignore internal order updates - # triggered by kraken's "edit" - # endpoint. - continue + # process and relay order state change events + # https://docs.kraken.com/websockets/#message-openOrders + case [ + order_msgs, + 'openOrders', + {'sequence': seq}, + ]: + for order_msg in order_msgs: + log.info( + f'Order msg update_{seq}:\n' + f'{pformat(order_msg)}' + ) + txid, update_msg = list(order_msg.items())[0] + match update_msg: + case { + 'cancel_reason': 'Order replaced', + 'status': status, + 'userref': reqid, + **rest, + }: + # we ignore internal order updates + # triggered by kraken's "edit" + # endpoint. + continue - case { - 'status': status, - 'userref': reqid, - **rest, + case { + 'status': status, + 'userref': reqid, + **rest, - # XXX: eg. of remaining msg schema: - # 'avg_price': _, - # 'cost': _, - # 'descr': { - # 'close': None, - # 'leverage': None, - # 'order': descr, - # 'ordertype': 'limit', - # 'pair': 'XMR/EUR', - # 'price': '74.94000000', - # 'price2': '0.00000000', - # 'type': 'buy' - # }, - # 'expiretm': None, - # 'fee': '0.00000000', - # 'limitprice': '0.00000000', - # 'misc': '', - # 'oflags': 'fciq', - # 'opentm': '1656966131.337344', - # 'refid': None, - # 'starttm': None, - # 'stopprice': '0.00000000', - # 'timeinforce': 'GTC', - # 'vol': submit_vlm, # '13.34400854', - # 'vol_exec': exec_vlm, # 0.0000 - }: - ems_status = { - 'open': 'submitted', - 'closed': 'cancelled', - 'canceled': 'cancelled', - # do we even need to forward - # this state to the ems? - 'pending': 'pending', - }[status] + # XXX: eg. of remaining msg schema: + # 'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm, # 0.0000 + }: + ems_status = { + 'open': 'submitted', + 'closed': 'cancelled', + 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? + 'pending': 'pending', + }[status] - submit_vlm = rest.get('vol', 0) - exec_vlm = rest.get('vol_exec', 0) + submit_vlm = rest.get('vol', 0) + exec_vlm = rest.get('vol_exec', 0) - oid = ids.inverse[reqid] - msgs = emsflow[oid] + oid = ids.inverse[reqid] + msgs = emsflow[oid] - # send BrokerdStatus messages for all - # order state updates - resp = BrokerdStatus( - - reqid=txid, - time_ns=time.time_ns(), # cuz why not - account=f'kraken.{acctid}', - - # everyone doin camel case.. - status=ems_status, # force lower case - - filled=exec_vlm, - reason='', # why held? - remaining=( - float(submit_vlm) - - - float(exec_vlm) - ), - - broker_details=dict( - {'name': 'kraken'}, **update_msg - ), - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - - case _: - log.warning( - 'Unknown orders msg:\n' - f'{txid}:{order_msg}' - ) - - case { - 'event': etype, - 'status': status, - 'errorMessage': errmsg, - 'reqid': reqid, - } if ( - etype in {'addOrderStatus', 'editOrderStatus'} - and status == 'error' - ): - log.error( - f'Failed to submit/edit order {reqid}:\n' - f'{errmsg}' - ) - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - resp = BrokerdError( - oid=oid, - # use old reqid in case it changed? - reqid=last.reqid, - symbol=last.symbol, - reason=f'Failed submit:\n{errmsg}', - broker_details=resp - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - - # if we rx any error cancel the order again - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [last.reqid], # txid from submission - }) - - case { - 'event': 'addOrderStatus', - 'status': status, - 'reqid': reqid, # oid from ems side - - # NOTE: in the case of an edit request this is - # a new value! - 'txid': txid, - - 'descr': descr, # only on success? - # 'originaltxid': txid, # only on edits - # **rest, - }: - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - log.info( - f'Submitting order: {descr}\n' - f'ems oid: {oid}\n' - f're-mapped reqid: {reqid}\n' - f'txid: {txid}\n' - ) - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - - case { - 'event': 'editOrderStatus', - 'status': status, - 'reqid': reqid, # oid from ems side - 'descr': descr, - - # NOTE: for edit request this is a new value - 'txid': txid, - 'originaltxid': origtxid, - }: - log.info( - f'Editting order {oid}[requid={reqid}]:\n' - f'txid: {origtxid} -> {txid}\n' - f'{descr}' - ) - # deliver another ack to update the ems-side `.reqid`. - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - - # successful cancellation - case { - "event": "cancelOrderStatus", - "status": "ok", - 'txid': txids, - 'reqid': reqid, - }: - # TODO: should we support "batch" acking of - # multiple cancels thus avoiding the below loop? - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - - for txid in txids: + # send BrokerdStatus messages for all + # order state updates resp = BrokerdStatus( + reqid=txid, - account=last.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Cancel success: {oid}@{txid}', - broker_details=resp, + time_ns=time.time_ns(), # cuz why not + account=f'kraken.{acctid}', + + # everyone doin camel case.. + status=ems_status, # force lower case + + filled=exec_vlm, + reason='', # why held? + remaining=( + float(submit_vlm) + - + float(exec_vlm) + ), + + broker_details=dict( + {'name': 'kraken'}, **update_msg + ), ) msgs.append(resp) await ems_stream.send(resp.dict()) - # failed cancel - case { - "event": "cancelOrderStatus", - "status": "error", - "errorMessage": errmsg, - 'reqid': reqid, - }: - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] + case _: + log.warning( + 'Unknown orders msg:\n' + f'{txid}:{order_msg}' + ) - resp = BrokerdError( - oid=oid, - reqid=last.reqid, - symbol=last.symbol, - reason=f'Failed order cancel {errmsg}', - broker_details=resp - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) + case { + 'event': etype, + 'status': status, + 'errorMessage': errmsg, + 'reqid': reqid, + } if ( + etype in {'addOrderStatus', 'editOrderStatus'} + and status == 'error' + ): + log.error( + f'Failed to submit/edit order {reqid}:\n' + f'{errmsg}' + ) + oid = ids.inverse[reqid] + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdError( + oid=oid, + # use old reqid in case it changed? + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed submit:\n{errmsg}', + broker_details=resp + ) + msgs.append(resp) + await ems_stream.send(resp.dict()) - case _: - log.warning(f'Unhandled trades msg: {msg}') + # if we rx any error cancel the order again + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [last.reqid], # txid from submission + }) + + case { + 'event': 'addOrderStatus', + 'status': status, + 'reqid': reqid, # oid from ems side + + # NOTE: in the case of an edit request this is + # a new value! + 'txid': txid, + + 'descr': descr, # only on success? + # 'originaltxid': txid, # only on edits + # **rest, + }: + oid = ids.inverse[reqid] + msgs = emsflow[oid] + last = msgs[-1] + log.info( + f'Submitting order: {descr}\n' + f'ems oid: {oid}\n' + f're-mapped reqid: {reqid}\n' + f'txid: {txid}\n' + ) + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account + ) + msgs.append(resp) + await ems_stream.send(resp.dict()) + + case { + 'event': 'editOrderStatus', + 'status': status, + 'reqid': reqid, # oid from ems side + 'descr': descr, + + # NOTE: for edit request this is a new value + 'txid': txid, + 'originaltxid': origtxid, + }: + log.info( + f'Editting order {oid}[requid={reqid}]:\n' + f'txid: {origtxid} -> {txid}\n' + f'{descr}' + ) + # deliver another ack to update the ems-side `.reqid`. + oid = ids.inverse[reqid] + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account + ) + msgs.append(resp) + await ems_stream.send(resp.dict()) + + # successful cancellation + case { + "event": "cancelOrderStatus", + "status": "ok", + 'txid': txids, + 'reqid': reqid, + }: + # TODO: should we support "batch" acking of + # multiple cancels thus avoiding the below loop? + oid = ids.inverse[reqid] + msgs = emsflow[oid] + last = msgs[-1] + + for txid in txids: + resp = BrokerdStatus( + reqid=txid, + account=last.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=resp, + ) + msgs.append(resp) + await ems_stream.send(resp.dict()) + + # failed cancel + case { + "event": "cancelOrderStatus", + "status": "error", + "errorMessage": errmsg, + 'reqid': reqid, + }: + oid = ids.inverse[reqid] + msgs = emsflow[oid] + last = msgs[-1] + + resp = BrokerdError( + oid=oid, + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed order cancel {errmsg}', + broker_details=resp + ) + msgs.append(resp) + await ems_stream.send(resp.dict()) + + case _: + log.warning(f'Unhandled trades msg: {msg}') def norm_trade_records(