Cancel any live orders found on connect
More or less just to avoid orders the user wasn't aware of from persisting until we get "open order relaying" through the ems working. Some further fixes which required a new `reqids2txids` map which keeps track of which `kraken` "txid" is mapped to our `reqid: int`; mainly this was needed for cancel requests which require knowing the underlying `txid`s (since apparently kraken doesn't keep track of the "reqid" we pass it). Pass the ws instance into `handle_order_updates()` to enable the cancelling orders on startup. Don't key error on unknown `reqid` values (for eg. when receiving historical trade events on startup). Handle cancel requests first in the ems side loop.kraken_ws_orders
parent
a20a8d95d5
commit
caecbaa231
piker/brokers/kraken
|
@ -82,6 +82,7 @@ async def handle_order_requests(
|
||||||
token: str,
|
token: str,
|
||||||
emsflow: dict[str, list[MsgUnion]],
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
|
reqids2txids: dict[int, str],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -97,6 +98,23 @@ async def handle_order_requests(
|
||||||
async for msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
match msg:
|
match msg:
|
||||||
|
case {
|
||||||
|
'action': 'cancel',
|
||||||
|
}:
|
||||||
|
cancel = BrokerdCancel(**msg)
|
||||||
|
last = emsflow[cancel.oid]
|
||||||
|
reqid = ids[cancel.oid]
|
||||||
|
txid = reqids2txids[reqid]
|
||||||
|
|
||||||
|
# call ws api to cancel:
|
||||||
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
await ws.send_msg({
|
||||||
|
'event': 'cancelOrder',
|
||||||
|
'token': token,
|
||||||
|
'reqid': reqid,
|
||||||
|
'txid': [txid], # should be txid from submission
|
||||||
|
})
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'account': 'kraken.spot' as account,
|
'account': 'kraken.spot' as account,
|
||||||
'action': action,
|
'action': action,
|
||||||
|
@ -109,10 +127,9 @@ async def handle_order_requests(
|
||||||
if order.oid in ids:
|
if order.oid in ids:
|
||||||
ep = 'editOrder'
|
ep = 'editOrder'
|
||||||
reqid = ids[order.oid] # integer not txid
|
reqid = ids[order.oid] # integer not txid
|
||||||
last = emsflow[order.oid][-1]
|
txid = reqids2txids[reqid]
|
||||||
assert last.reqid == order.reqid
|
|
||||||
extra = {
|
extra = {
|
||||||
'orderid': last.reqid, # txid
|
'orderid': txid, # txid
|
||||||
}
|
}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -159,23 +176,6 @@ async def handle_order_requests(
|
||||||
# placehold for sanity checking in relay loop
|
# placehold for sanity checking in relay loop
|
||||||
emsflow.setdefault(order.oid, []).append(order)
|
emsflow.setdefault(order.oid, []).append(order)
|
||||||
|
|
||||||
case {
|
|
||||||
'account': 'kraken.spot' as account,
|
|
||||||
'action': 'cancel',
|
|
||||||
}:
|
|
||||||
cancel = BrokerdCancel(**msg)
|
|
||||||
assert cancel.oid in emsflow
|
|
||||||
reqid = ids[cancel.oid]
|
|
||||||
|
|
||||||
# call ws api to cancel:
|
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
|
||||||
await ws.send_msg({
|
|
||||||
'event': 'cancelOrder',
|
|
||||||
'token': token,
|
|
||||||
'reqid': reqid,
|
|
||||||
'txid': [cancel.reqid], # should be txid from submission
|
|
||||||
})
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
account = msg.get('account')
|
account = msg.get('account')
|
||||||
if account != 'kraken.spot':
|
if account != 'kraken.spot':
|
||||||
|
@ -327,6 +327,7 @@ async def trades_dialogue(
|
||||||
|
|
||||||
# 2way map for ems ids to kraken int reqids..
|
# 2way map for ems ids to kraken int reqids..
|
||||||
ids: bidict[str, int] = bidict()
|
ids: bidict[str, int] = bidict()
|
||||||
|
reqids2txids: dict[int, str] = {}
|
||||||
|
|
||||||
# task for processing inbound requests from ems
|
# task for processing inbound requests from ems
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
|
@ -337,14 +338,17 @@ async def trades_dialogue(
|
||||||
token,
|
token,
|
||||||
emsflow,
|
emsflow,
|
||||||
ids,
|
ids,
|
||||||
|
reqids2txids,
|
||||||
)
|
)
|
||||||
|
|
||||||
# enter relay loop
|
# enter relay loop
|
||||||
await handle_order_updates(
|
await handle_order_updates(
|
||||||
|
ws,
|
||||||
stream,
|
stream,
|
||||||
ems_stream,
|
ems_stream,
|
||||||
emsflow,
|
emsflow,
|
||||||
ids,
|
ids,
|
||||||
|
reqids2txids,
|
||||||
trans,
|
trans,
|
||||||
acctid,
|
acctid,
|
||||||
acc_name,
|
acc_name,
|
||||||
|
@ -353,10 +357,12 @@ async def trades_dialogue(
|
||||||
|
|
||||||
|
|
||||||
async def handle_order_updates(
|
async def handle_order_updates(
|
||||||
ws_stream: NoBsWs,
|
ws: NoBsWs,
|
||||||
|
ws_stream: AsyncIterator,
|
||||||
ems_stream: tractor.MsgStream,
|
ems_stream: tractor.MsgStream,
|
||||||
emsflow: dict[str, list[MsgUnion]],
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
|
reqids2txids: dict[int, str],
|
||||||
trans: list[pp.Transaction],
|
trans: list[pp.Transaction],
|
||||||
acctid: str,
|
acctid: str,
|
||||||
acc_name: str,
|
acc_name: str,
|
||||||
|
@ -552,7 +558,29 @@ async def handle_order_updates(
|
||||||
submit_vlm = rest.get('vol', 0)
|
submit_vlm = rest.get('vol', 0)
|
||||||
exec_vlm = rest.get('vol_exec', 0)
|
exec_vlm = rest.get('vol_exec', 0)
|
||||||
|
|
||||||
oid = ids.inverse[reqid]
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
|
oid = ids.inverse.get(reqid)
|
||||||
|
if not oid:
|
||||||
|
# TODO: handle these and relay them
|
||||||
|
# through the EMS to the client / UI
|
||||||
|
# side!
|
||||||
|
log.warning(
|
||||||
|
f'Received active order {txid}:\n'
|
||||||
|
f'{update_msg}\n'
|
||||||
|
'Cancelling order for now!..'
|
||||||
|
)
|
||||||
|
|
||||||
|
# call ws api to cancel:
|
||||||
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
await ws.send_msg({
|
||||||
|
'event': 'cancelOrder',
|
||||||
|
'token': token,
|
||||||
|
'reqid': reqid,
|
||||||
|
'txid': [txid],
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
msgs = emsflow[oid]
|
msgs = emsflow[oid]
|
||||||
|
|
||||||
# send BrokerdStatus messages for all
|
# send BrokerdStatus messages for all
|
||||||
|
@ -591,6 +619,7 @@ async def handle_order_updates(
|
||||||
'event': etype,
|
'event': etype,
|
||||||
'status': status,
|
'status': status,
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
|
**rest,
|
||||||
} as event if (
|
} as event if (
|
||||||
etype in {
|
etype in {
|
||||||
'addOrderStatus',
|
'addOrderStatus',
|
||||||
|
@ -598,7 +627,18 @@ async def handle_order_updates(
|
||||||
'cancelOrderStatus',
|
'cancelOrderStatus',
|
||||||
}
|
}
|
||||||
):
|
):
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse.get(reqid)
|
||||||
|
if not oid:
|
||||||
|
log.warning(
|
||||||
|
'Unknown order status update?:\n'
|
||||||
|
f'{event}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
txid = rest.get('txid')
|
||||||
|
if txid:
|
||||||
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
msgs = emsflow[oid]
|
msgs = emsflow[oid]
|
||||||
last = msgs[-1]
|
last = msgs[-1]
|
||||||
resps, errored = process_status(
|
resps, errored = process_status(
|
||||||
|
@ -608,19 +648,10 @@ async def handle_order_updates(
|
||||||
msgs,
|
msgs,
|
||||||
last,
|
last,
|
||||||
)
|
)
|
||||||
# if errored:
|
|
||||||
# if we rx any error cancel the order again
|
|
||||||
# await ws.send_msg({
|
|
||||||
# 'event': 'cancelOrder',
|
|
||||||
# 'token': token,
|
|
||||||
# 'reqid': reqid,
|
|
||||||
# 'txid': [last.reqid], # txid from submission
|
|
||||||
# })
|
|
||||||
|
|
||||||
if resps:
|
if resps:
|
||||||
msgs.extend(resps)
|
msgs.extend(resps)
|
||||||
for resp in resps:
|
for resp in resps:
|
||||||
await ems_stream.send(resp.dict())
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled trades update msg: {msg}')
|
log.warning(f'Unhandled trades update msg: {msg}')
|
||||||
|
|
Loading…
Reference in New Issue