Factor status handling into a new `process_status()` helper

kraken_ws_orders
Tyler Goodlet 2022-07-05 12:58:08 -04:00
parent d9b4c4a413
commit 9fa9c27e4d
1 changed files with 136 additions and 133 deletions

View File

@ -26,7 +26,6 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
# Optional,
Union, Union,
) )
@ -476,15 +475,15 @@ async def handle_order_updates(
) )
txid, update_msg = list(order_msg.items())[0] txid, update_msg = list(order_msg.items())[0]
match update_msg: match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case { case {
'cancel_reason': 'Order replaced', 'cancel_reason': 'Order replaced',
'status': status, 'status': status,
'userref': reqid, 'userref': reqid,
**rest, **rest,
}: }:
# we ignore internal order updates
# triggered by kraken's "edit"
# endpoint.
continue continue
case { case {
@ -568,54 +567,86 @@ async def handle_order_updates(
case { case {
'event': etype, 'event': etype,
'status': status, 'status': status,
'errorMessage': errmsg,
'reqid': reqid, 'reqid': reqid,
} if ( } as event if (
etype in {'addOrderStatus', 'editOrderStatus'} etype in {
and status == 'error' 'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
): ):
log.error(
f'Failed to submit/edit order {reqid}:\n'
f'{errmsg}'
)
oid = ids.inverse[reqid] oid = ids.inverse[reqid]
msgs = emsflow[oid] msgs = emsflow[oid]
last = msgs[-1] last = msgs[-1]
resp = BrokerdError( resps, errored = process_status(
oid=oid, event,
# use old reqid in case it changed? oid,
reqid=last.reqid, token,
symbol=last.symbol, msgs,
reason=f'Failed submit:\n{errmsg}', last,
broker_details=resp
) )
msgs.append(resp) # if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp.dict()) await ems_stream.send(resp.dict())
# if we rx any error cancel the order again case _:
await ws.send_msg({ log.warning(f'Unhandled trades update msg: {msg}')
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [last.reqid], # txid from submission
})
def process_status(
event: dict[str, str],
oid: str,
token: str,
msgs: list[MsgUnion],
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case { case {
'event': 'addOrderStatus', 'event': 'addOrderStatus',
'status': status, 'status': "ok",
'reqid': reqid, # oid from ems side 'reqid': reqid, # oid from ems side
# NOTE: in the case of an edit request this is
# a new value!
'txid': txid, 'txid': txid,
'descr': descr, # only on success? 'descr': descr, # only on success?
# 'originaltxid': txid, # only on edits
# **rest,
}: }:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
log.info( log.info(
f'Submitting order: {descr}\n' f'Submitting order: {descr}\n'
f'ems oid: {oid}\n' f'ems oid: {oid}\n'
@ -627,12 +658,11 @@ async def handle_order_updates(
reqid=txid, # kraken unique order id reqid=txid, # kraken unique order id
account=last.account, # piker account account=last.account, # piker account
) )
msgs.append(resp) return [resp], False
await ems_stream.send(resp.dict())
case { case {
'event': 'editOrderStatus', 'event': 'editOrderStatus',
'status': status, 'status': "ok",
'reqid': reqid, # oid from ems side 'reqid': reqid, # oid from ems side
'descr': descr, 'descr': descr,
@ -646,65 +676,37 @@ async def handle_order_updates(
f'{descr}' f'{descr}'
) )
# deliver another ack to update the ems-side `.reqid`. # deliver another ack to update the ems-side `.reqid`.
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdOrderAck( resp = BrokerdOrderAck(
oid=oid, # ems order request id oid=oid, # ems order request id
reqid=txid, # kraken unique order id reqid=txid, # kraken unique order id
account=last.account, # piker account account=last.account, # piker account
) )
msgs.append(resp) return [resp], False
await ems_stream.send(resp.dict())
# successful cancellation
case { case {
"event": "cancelOrderStatus", "event": "cancelOrderStatus",
"status": "ok", "status": "ok",
'txid': txids,
'reqid': reqid, 'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}: }:
# TODO: should we support "batch" acking of # TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop? # multiple cancels thus avoiding the below loop?
oid = ids.inverse[reqid] resps: list[MsgUnion] = []
msgs = emsflow[oid] for txid in rest.get('txid', [last.reqid]):
last = msgs[-1]
for txid in txids:
resp = BrokerdStatus( resp = BrokerdStatus(
reqid=txid, reqid=txid,
account=last.account, account=last.account,
time_ns=time.time_ns(), time_ns=time.time_ns(),
status='cancelled', status='cancelled',
reason='Cancel success: {oid}@{txid}', reason='Cancel success: {oid}@{txid}',
broker_details=resp, broker_details=event,
) )
msgs.append(resp) resps.append(resp)
await ems_stream.send(resp.dict())
# failed cancel return resps, False
case {
"event": "cancelOrderStatus",
"status": "error",
"errorMessage": errmsg,
'reqid': reqid,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed order cancel {errmsg}',
broker_details=resp
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(f'Unhandled trades msg: {msg}')
def norm_trade_records( def norm_trade_records(
@ -713,7 +715,6 @@ def norm_trade_records(
) -> list[pp.Transaction]: ) -> list[pp.Transaction]:
records: list[pp.Transaction] = [] records: list[pp.Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
size = record.get('vol') * { size = record.get('vol') * {
@ -746,8 +747,10 @@ async def update_ledger(
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]: ) -> list[pp.Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
# write recent session's trades to the user's (local) ledger file. '''
with pp.open_trade_ledger( with pp.open_trade_ledger(
'kraken', 'kraken',
acctid, acctid,