Cancel any live orders found on connect

More or less just to avoid orders the user wasn't aware of from
persisting until we get "open order relaying" through the ems working.

Some further fixes which required a new `reqids2txids` map which keeps
track of which `kraken` "txid" is mapped to our `reqid: int`; mainly
this was needed for cancel requests which require knowing the underlying
`txid`s (since apparently kraken doesn't keep track of the "reqid"  we
pass it). Pass the ws instance into `handle_order_updates()` to enable
the cancelling orders on startup. Don't key error on unknown `reqid`
values (for eg. when receiving historical trade events on startup).
Handle cancel requests first in the ems side loop.
dpbackup
Tyler Goodlet 2022-07-08 23:10:25 -04:00
parent 8984c1b60b
commit 381b3121d6
1 changed files with 64 additions and 33 deletions

View File

@ -82,6 +82,7 @@ async def handle_order_requests(
token: str, token: str,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None: ) -> None:
''' '''
@ -97,6 +98,23 @@ async def handle_order_requests(
async for msg in ems_order_stream: async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}') log.info(f'Rx order msg:\n{pformat(msg)}')
match msg: match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': action, 'action': action,
@ -109,10 +127,9 @@ async def handle_order_requests(
if order.oid in ids: if order.oid in ids:
ep = 'editOrder' ep = 'editOrder'
reqid = ids[order.oid] # integer not txid reqid = ids[order.oid] # integer not txid
last = emsflow[order.oid][-1] txid = reqids2txids[reqid]
assert last.reqid == order.reqid
extra = { extra = {
'orderid': last.reqid, # txid 'orderid': txid, # txid
} }
else: else:
@ -159,23 +176,6 @@ async def handle_order_requests(
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order) emsflow.setdefault(order.oid, []).append(order)
case {
'account': 'kraken.spot' as account,
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [cancel.reqid], # should be txid from submission
})
case _: case _:
account = msg.get('account') account = msg.get('account')
if account != 'kraken.spot': if account != 'kraken.spot':
@ -327,6 +327,7 @@ async def trades_dialogue(
# 2way map for ems ids to kraken int reqids.. # 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict() ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems # task for processing inbound requests from ems
n.start_soon( n.start_soon(
@ -337,14 +338,17 @@ async def trades_dialogue(
token, token,
emsflow, emsflow,
ids, ids,
reqids2txids,
) )
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
ws,
stream, stream,
ems_stream, ems_stream,
emsflow, emsflow,
ids, ids,
reqids2txids,
trans, trans,
acctid, acctid,
acc_name, acc_name,
@ -353,10 +357,12 @@ async def trades_dialogue(
async def handle_order_updates( async def handle_order_updates(
ws_stream: NoBsWs, ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction], trans: list[pp.Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
@ -552,7 +558,29 @@ async def handle_order_updates(
submit_vlm = rest.get('vol', 0) submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0) exec_vlm = rest.get('vol_exec', 0)
oid = ids.inverse[reqid] reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid] msgs = emsflow[oid]
# send BrokerdStatus messages for all # send BrokerdStatus messages for all
@ -591,6 +619,7 @@ async def handle_order_updates(
'event': etype, 'event': etype,
'status': status, 'status': status,
'reqid': reqid, 'reqid': reqid,
**rest,
} as event if ( } as event if (
etype in { etype in {
'addOrderStatus', 'addOrderStatus',
@ -598,7 +627,18 @@ async def handle_order_updates(
'cancelOrderStatus', 'cancelOrderStatus',
} }
): ):
oid = ids.inverse[reqid] oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid] msgs = emsflow[oid]
last = msgs[-1] last = msgs[-1]
resps, errored = process_status( resps, errored = process_status(
@ -608,19 +648,10 @@ async def handle_order_updates(
msgs, msgs,
last, last,
) )
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
if resps: if resps:
msgs.extend(resps) msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')