Factor status handling into a new `process_status()` helper

tractor_typed_msg_hackin
Tyler Goodlet 2022-07-05 12:58:08 -04:00
parent 4d19c0f910
commit e901547e9f
1 changed files with 136 additions and 133 deletions

View File

@ -26,7 +26,6 @@ import time
from typing import (
Any,
AsyncIterator,
# Optional,
Union,
)
@ -477,15 +476,15 @@ async def handle_order_updates(
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
}:
# we ignore internal order updates
# triggered by kraken's "edit"
# endpoint.
continue
case {
@ -569,143 +568,146 @@ async def handle_order_updates(
case {
'event': etype,
'status': status,
'errorMessage': errmsg,
'reqid': reqid,
} if (
etype in {'addOrderStatus', 'editOrderStatus'}
and status == 'error'
} as event if (
etype in {
'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
):
log.error(
f'Failed to submit/edit order {reqid}:\n'
f'{errmsg}'
)
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
# use old reqid in case it changed?
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed submit:\n{errmsg}',
broker_details=resp
resps, errored = process_status(
event,
oid,
token,
msgs,
last,
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
# if we rx any error cancel the order again
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [last.reqid], # txid from submission
})
case {
'event': 'addOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
# NOTE: in the case of an edit request this is
# a new value!
'txid': txid,
'descr': descr, # only on success?
# 'originaltxid': txid, # only on edits
# **rest,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case {
'event': 'editOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# successful cancellation
case {
"event": "cancelOrderStatus",
"status": "ok",
'txid': txids,
'reqid': reqid,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
for txid in txids:
resp = BrokerdStatus(
reqid=txid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=resp,
)
msgs.append(resp)
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp.dict())
# failed cancel
case {
"event": "cancelOrderStatus",
"status": "error",
"errorMessage": errmsg,
'reqid': reqid,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed order cancel {errmsg}',
broker_details=resp
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(f'Unhandled trades msg: {msg}')
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
msgs: list[MsgUnion],
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
return [resp], False
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
return [resp], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
resps: list[MsgUnion] = []
for txid in rest.get('txid', [last.reqid]):
resp = BrokerdStatus(
reqid=txid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=event,
)
resps.append(resp)
return resps, False
def norm_trade_records(
@ -714,7 +716,6 @@ def norm_trade_records(
) -> list[pp.Transaction]:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = record.get('vol') * {
@ -747,8 +748,10 @@ async def update_ledger(
trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
# write recent session's trades to the user's (local) ledger file.
'''
with pp.open_trade_ledger(
'kraken',
acctid,