diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index ece37c67..128cd46a 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -26,7 +26,6 @@ import time from typing import ( Any, AsyncIterator, - # Optional, Union, ) @@ -476,15 +475,15 @@ async def handle_order_updates( ) txid, update_msg = list(order_msg.items())[0] match update_msg: + + # we ignore internal order updates triggered by + # kraken's "edit" endpoint. case { 'cancel_reason': 'Order replaced', 'status': status, 'userref': reqid, **rest, }: - # we ignore internal order updates - # triggered by kraken's "edit" - # endpoint. continue case { @@ -568,143 +567,146 @@ async def handle_order_updates( case { 'event': etype, 'status': status, - 'errorMessage': errmsg, 'reqid': reqid, - } if ( - etype in {'addOrderStatus', 'editOrderStatus'} - and status == 'error' + } as event if ( + etype in { + 'addOrderStatus', + 'editOrderStatus', + 'cancelOrderStatus', + } ): - log.error( - f'Failed to submit/edit order {reqid}:\n' - f'{errmsg}' - ) oid = ids.inverse[reqid] msgs = emsflow[oid] last = msgs[-1] - resp = BrokerdError( - oid=oid, - # use old reqid in case it changed? - reqid=last.reqid, - symbol=last.symbol, - reason=f'Failed submit:\n{errmsg}', - broker_details=resp + resps, errored = process_status( + event, + oid, + token, + msgs, + last, ) - msgs.append(resp) - await ems_stream.send(resp.dict()) + # if errored: + # if we rx any error cancel the order again + # await ws.send_msg({ + # 'event': 'cancelOrder', + # 'token': token, + # 'reqid': reqid, + # 'txid': [last.reqid], # txid from submission + # }) - # if we rx any error cancel the order again - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [last.reqid], # txid from submission - }) - - case { - 'event': 'addOrderStatus', - 'status': status, - 'reqid': reqid, # oid from ems side - - # NOTE: in the case of an edit request this is - # a new value! - 'txid': txid, - - 'descr': descr, # only on success? - # 'originaltxid': txid, # only on edits - # **rest, - }: - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - log.info( - f'Submitting order: {descr}\n' - f'ems oid: {oid}\n' - f're-mapped reqid: {reqid}\n' - f'txid: {txid}\n' - ) - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - - case { - 'event': 'editOrderStatus', - 'status': status, - 'reqid': reqid, # oid from ems side - 'descr': descr, - - # NOTE: for edit request this is a new value - 'txid': txid, - 'originaltxid': origtxid, - }: - log.info( - f'Editting order {oid}[requid={reqid}]:\n' - f'txid: {origtxid} -> {txid}\n' - f'{descr}' - ) - # deliver another ack to update the ems-side `.reqid`. - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - - # successful cancellation - case { - "event": "cancelOrderStatus", - "status": "ok", - 'txid': txids, - 'reqid': reqid, - }: - # TODO: should we support "batch" acking of - # multiple cancels thus avoiding the below loop? - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - - for txid in txids: - resp = BrokerdStatus( - reqid=txid, - account=last.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Cancel success: {oid}@{txid}', - broker_details=resp, - ) - msgs.append(resp) + msgs.extend(resps) + for resp in resps: await ems_stream.send(resp.dict()) - # failed cancel - case { - "event": "cancelOrderStatus", - "status": "error", - "errorMessage": errmsg, - 'reqid': reqid, - }: - oid = ids.inverse[reqid] - msgs = emsflow[oid] - last = msgs[-1] - - resp = BrokerdError( - oid=oid, - reqid=last.reqid, - symbol=last.symbol, - reason=f'Failed order cancel {errmsg}', - broker_details=resp - ) - msgs.append(resp) - await ems_stream.send(resp.dict()) - case _: - log.warning(f'Unhandled trades msg: {msg}') + log.warning(f'Unhandled trades update msg: {msg}') + + +def process_status( + event: dict[str, str], + oid: str, + token: str, + msgs: list[MsgUnion], + last: MsgUnion, + +) -> tuple[list[MsgUnion], bool]: + ''' + Process `'[add/edit/cancel]OrderStatus'` events by translating to + and returning the equivalent EMS-msg responses. + + ''' + match event: + case { + 'event': etype, + 'status': 'error', + 'reqid': reqid, + 'errorMessage': errmsg, + }: + # any of ``{'add', 'edit', 'cancel'}`` + action = etype.rstrip('OrderStatus') + log.error( + f'Failed to {action} order {reqid}:\n' + f'{errmsg}' + ) + resp = BrokerdError( + oid=oid, + # XXX: use old reqid in case it changed? + reqid=last.reqid, + symbol=last.symbol, + + reason=f'Failed {action}:\n{errmsg}', + broker_details=event + ) + return [resp], True + + # successful request cases + case { + 'event': 'addOrderStatus', + 'status': "ok", + 'reqid': reqid, # oid from ems side + 'txid': txid, + 'descr': descr, # only on success? + }: + log.info( + f'Submitting order: {descr}\n' + f'ems oid: {oid}\n' + f're-mapped reqid: {reqid}\n' + f'txid: {txid}\n' + ) + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account + ) + return [resp], False + + case { + 'event': 'editOrderStatus', + 'status': "ok", + 'reqid': reqid, # oid from ems side + 'descr': descr, + + # NOTE: for edit request this is a new value + 'txid': txid, + 'originaltxid': origtxid, + }: + log.info( + f'Editting order {oid}[requid={reqid}]:\n' + f'txid: {origtxid} -> {txid}\n' + f'{descr}' + ) + # deliver another ack to update the ems-side `.reqid`. + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account + ) + return [resp], False + + case { + "event": "cancelOrderStatus", + "status": "ok", + 'reqid': reqid, + + # XXX: sometimes this isn't provided!? + # 'txid': txids, + **rest, + }: + # TODO: should we support "batch" acking of + # multiple cancels thus avoiding the below loop? + resps: list[MsgUnion] = [] + for txid in rest.get('txid', [last.reqid]): + resp = BrokerdStatus( + reqid=txid, + account=last.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=event, + ) + resps.append(resp) + + return resps, False def norm_trade_records( @@ -713,7 +715,6 @@ def norm_trade_records( ) -> list[pp.Transaction]: records: list[pp.Transaction] = [] - for tid, record in ledger.items(): size = record.get('vol') * { @@ -746,8 +747,10 @@ async def update_ledger( trade_entries: list[dict[str, Any]], ) -> list[pp.Transaction]: + ''' + Write recent session's trades to the user's (local) ledger file. - # write recent session's trades to the user's (local) ledger file. + ''' with pp.open_trade_ledger( 'kraken', acctid,