Factor status handling into a new `process_status()` helper
parent
f1192dff09
commit
cb7a9b9449
|
@ -26,7 +26,6 @@ import time
|
|||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
# Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
|
@ -477,15 +476,15 @@ async def handle_order_updates(
|
|||
)
|
||||
txid, update_msg = list(order_msg.items())[0]
|
||||
match update_msg:
|
||||
|
||||
# we ignore internal order updates triggered by
|
||||
# kraken's "edit" endpoint.
|
||||
case {
|
||||
'cancel_reason': 'Order replaced',
|
||||
'status': status,
|
||||
'userref': reqid,
|
||||
**rest,
|
||||
}:
|
||||
# we ignore internal order updates
|
||||
# triggered by kraken's "edit"
|
||||
# endpoint.
|
||||
continue
|
||||
|
||||
case {
|
||||
|
@ -569,54 +568,86 @@ async def handle_order_updates(
|
|||
case {
|
||||
'event': etype,
|
||||
'status': status,
|
||||
'errorMessage': errmsg,
|
||||
'reqid': reqid,
|
||||
} if (
|
||||
etype in {'addOrderStatus', 'editOrderStatus'}
|
||||
and status == 'error'
|
||||
} as event if (
|
||||
etype in {
|
||||
'addOrderStatus',
|
||||
'editOrderStatus',
|
||||
'cancelOrderStatus',
|
||||
}
|
||||
):
|
||||
log.error(
|
||||
f'Failed to submit/edit order {reqid}:\n'
|
||||
f'{errmsg}'
|
||||
)
|
||||
oid = ids.inverse[reqid]
|
||||
msgs = emsflow[oid]
|
||||
last = msgs[-1]
|
||||
resp = BrokerdError(
|
||||
oid=oid,
|
||||
# use old reqid in case it changed?
|
||||
reqid=last.reqid,
|
||||
symbol=last.symbol,
|
||||
reason=f'Failed submit:\n{errmsg}',
|
||||
broker_details=resp
|
||||
resps, errored = process_status(
|
||||
event,
|
||||
oid,
|
||||
token,
|
||||
msgs,
|
||||
last,
|
||||
)
|
||||
msgs.append(resp)
|
||||
# if errored:
|
||||
# if we rx any error cancel the order again
|
||||
# await ws.send_msg({
|
||||
# 'event': 'cancelOrder',
|
||||
# 'token': token,
|
||||
# 'reqid': reqid,
|
||||
# 'txid': [last.reqid], # txid from submission
|
||||
# })
|
||||
|
||||
msgs.extend(resps)
|
||||
for resp in resps:
|
||||
await ems_stream.send(resp.dict())
|
||||
|
||||
# if we rx any error cancel the order again
|
||||
await ws.send_msg({
|
||||
'event': 'cancelOrder',
|
||||
'token': token,
|
||||
'reqid': reqid,
|
||||
'txid': [last.reqid], # txid from submission
|
||||
})
|
||||
case _:
|
||||
log.warning(f'Unhandled trades update msg: {msg}')
|
||||
|
||||
|
||||
def process_status(
|
||||
event: dict[str, str],
|
||||
oid: str,
|
||||
token: str,
|
||||
msgs: list[MsgUnion],
|
||||
last: MsgUnion,
|
||||
|
||||
) -> tuple[list[MsgUnion], bool]:
|
||||
'''
|
||||
Process `'[add/edit/cancel]OrderStatus'` events by translating to
|
||||
and returning the equivalent EMS-msg responses.
|
||||
|
||||
'''
|
||||
match event:
|
||||
case {
|
||||
'event': etype,
|
||||
'status': 'error',
|
||||
'reqid': reqid,
|
||||
'errorMessage': errmsg,
|
||||
}:
|
||||
# any of ``{'add', 'edit', 'cancel'}``
|
||||
action = etype.rstrip('OrderStatus')
|
||||
log.error(
|
||||
f'Failed to {action} order {reqid}:\n'
|
||||
f'{errmsg}'
|
||||
)
|
||||
resp = BrokerdError(
|
||||
oid=oid,
|
||||
# XXX: use old reqid in case it changed?
|
||||
reqid=last.reqid,
|
||||
symbol=last.symbol,
|
||||
|
||||
reason=f'Failed {action}:\n{errmsg}',
|
||||
broker_details=event
|
||||
)
|
||||
return [resp], True
|
||||
|
||||
# successful request cases
|
||||
case {
|
||||
'event': 'addOrderStatus',
|
||||
'status': status,
|
||||
'status': "ok",
|
||||
'reqid': reqid, # oid from ems side
|
||||
|
||||
# NOTE: in the case of an edit request this is
|
||||
# a new value!
|
||||
'txid': txid,
|
||||
|
||||
'descr': descr, # only on success?
|
||||
# 'originaltxid': txid, # only on edits
|
||||
# **rest,
|
||||
}:
|
||||
oid = ids.inverse[reqid]
|
||||
msgs = emsflow[oid]
|
||||
last = msgs[-1]
|
||||
log.info(
|
||||
f'Submitting order: {descr}\n'
|
||||
f'ems oid: {oid}\n'
|
||||
|
@ -628,12 +659,11 @@ async def handle_order_updates(
|
|||
reqid=txid, # kraken unique order id
|
||||
account=last.account, # piker account
|
||||
)
|
||||
msgs.append(resp)
|
||||
await ems_stream.send(resp.dict())
|
||||
return [resp], False
|
||||
|
||||
case {
|
||||
'event': 'editOrderStatus',
|
||||
'status': status,
|
||||
'status': "ok",
|
||||
'reqid': reqid, # oid from ems side
|
||||
'descr': descr,
|
||||
|
||||
|
@ -647,65 +677,37 @@ async def handle_order_updates(
|
|||
f'{descr}'
|
||||
)
|
||||
# deliver another ack to update the ems-side `.reqid`.
|
||||
oid = ids.inverse[reqid]
|
||||
msgs = emsflow[oid]
|
||||
last = msgs[-1]
|
||||
resp = BrokerdOrderAck(
|
||||
oid=oid, # ems order request id
|
||||
reqid=txid, # kraken unique order id
|
||||
account=last.account, # piker account
|
||||
)
|
||||
msgs.append(resp)
|
||||
await ems_stream.send(resp.dict())
|
||||
return [resp], False
|
||||
|
||||
# successful cancellation
|
||||
case {
|
||||
"event": "cancelOrderStatus",
|
||||
"status": "ok",
|
||||
'txid': txids,
|
||||
'reqid': reqid,
|
||||
|
||||
# XXX: sometimes this isn't provided!?
|
||||
# 'txid': txids,
|
||||
**rest,
|
||||
}:
|
||||
# TODO: should we support "batch" acking of
|
||||
# multiple cancels thus avoiding the below loop?
|
||||
oid = ids.inverse[reqid]
|
||||
msgs = emsflow[oid]
|
||||
last = msgs[-1]
|
||||
|
||||
for txid in txids:
|
||||
resps: list[MsgUnion] = []
|
||||
for txid in rest.get('txid', [last.reqid]):
|
||||
resp = BrokerdStatus(
|
||||
reqid=txid,
|
||||
account=last.account,
|
||||
time_ns=time.time_ns(),
|
||||
status='cancelled',
|
||||
reason='Cancel success: {oid}@{txid}',
|
||||
broker_details=resp,
|
||||
broker_details=event,
|
||||
)
|
||||
msgs.append(resp)
|
||||
await ems_stream.send(resp.dict())
|
||||
resps.append(resp)
|
||||
|
||||
# failed cancel
|
||||
case {
|
||||
"event": "cancelOrderStatus",
|
||||
"status": "error",
|
||||
"errorMessage": errmsg,
|
||||
'reqid': reqid,
|
||||
}:
|
||||
oid = ids.inverse[reqid]
|
||||
msgs = emsflow[oid]
|
||||
last = msgs[-1]
|
||||
|
||||
resp = BrokerdError(
|
||||
oid=oid,
|
||||
reqid=last.reqid,
|
||||
symbol=last.symbol,
|
||||
reason=f'Failed order cancel {errmsg}',
|
||||
broker_details=resp
|
||||
)
|
||||
msgs.append(resp)
|
||||
await ems_stream.send(resp.dict())
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled trades msg: {msg}')
|
||||
return resps, False
|
||||
|
||||
|
||||
def norm_trade_records(
|
||||
|
@ -714,7 +716,6 @@ def norm_trade_records(
|
|||
) -> list[pp.Transaction]:
|
||||
|
||||
records: list[pp.Transaction] = []
|
||||
|
||||
for tid, record in ledger.items():
|
||||
|
||||
size = record.get('vol') * {
|
||||
|
@ -747,8 +748,10 @@ async def update_ledger(
|
|||
trade_entries: list[dict[str, Any]],
|
||||
|
||||
) -> list[pp.Transaction]:
|
||||
'''
|
||||
Write recent session's trades to the user's (local) ledger file.
|
||||
|
||||
# write recent session's trades to the user's (local) ledger file.
|
||||
'''
|
||||
with pp.open_trade_ledger(
|
||||
'kraken',
|
||||
acctid,
|
||||
|
|
Loading…
Reference in New Issue