Get order "editing" working fully

Turns out the EMS can support this as originally expected: you can
update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems
which update its cross-dialog (leg) tracking correctly! The issue was
a bug in the `editOrderStatus` msg handling and appropriate tracking
of the correct `.oid` (ems uid) on the kraken side. This unfortunately
required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow
tracing table which means the broker daemon is tracking all the msg flow
with the ems, though I'm wondering now if this is just good practise
anyway and maybe we should offer a small primitive type from our msging
utils to aid with this? I've used such constructs in event handling
systems prior.

There's a lot more factoring that can be done after these changes as
well but the quick detailed summary is,
- rework the `handle_order_requests()` loop to use `match:` syntax and
  update the new `emsflow` table on every new request from the ems.
- fix the `editOrderStatus` case pattern to not include an error msg and
  thus actually be triggered to respond to the ems with a `BrokerdAck`
  containing the new `.reqid`, the new kraken side `txid`.
- skip any `openOrders` msgs which are detected as being kraken's
  internal order "edits" by matching on the `cancel_reason` field.
- update the `emsflow` table in all ws-stream msg handling blocks
  with responses sent to the ems.

Relates to #290
krakenwsbackup
Tyler Goodlet 2022-07-05 11:03:32 -04:00
parent f38eef2bf4
commit c74741228f
1 changed files with 166 additions and 149 deletions

View File

@ -26,9 +26,8 @@ import time
from typing import (
Any,
AsyncIterator,
# Callable,
# Optional,
# Union,
Union,
)
from bidict import bidict
@ -62,6 +61,16 @@ from .feed import (
stream_messages,
)
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
]
async def handle_order_requests(
@ -69,7 +78,7 @@ async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
token: str,
requests: dict[str, BrokerdOrder],
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
) -> None:
@ -79,101 +88,75 @@ async def handle_order_requests(
'''
# XXX: UGH, let's unify this.. with ``msgspec``.
request_msg: dict
msg: dict[str, Any]
order: BrokerdOrder
counter = count()
async for request_msg in ems_order_stream:
log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(
BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=(
'Kraken only, order mode disabled due to '
'https://github.com/pikers/piker/issues/299'
),
).dict()
)
continue
action = request_msg['action']
if action in {'buy', 'sell'}:
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'account': 'kraken.spot',
'action': action,
} if action in {'buy', 'sell'}:
# validate
msg = BrokerdOrder(**request_msg)
order = BrokerdOrder(**msg)
# logic from old `Client.submit_limit()`
if msg.oid in ids:
if order.oid in ids:
ep = 'editOrder'
reqid = ids[msg.oid] # integer not txid
order = requests[msg.oid]
assert order.oid == msg.oid
reqid = ids[order.oid] # integer not txid
last = emsflow[order.oid][-1]
assert last.reqid == order.reqid
extra = {
'orderid': msg.reqid, # txid
'orderid': last.reqid, # txid
}
# XXX: TODO: get this working, but currently the EMS
# doesn't support changing order `.reqid` (in this case
# kraken changes them via a cancel and a new
# submission). So for now cancel and report the error.
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [msg.reqid], # should be txid from submission
})
continue
else:
ep = 'addOrder'
reqid = next(counter)
ids[msg.oid] = reqid
ids[order.oid] = reqid
log.debug(
f"GENERATED ORDER {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': msg.action,
'type': order.action,
}
psym = msg.symbol.upper()
psym = order.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
await ws.send_msg({
req = {
'event': ep,
'token': token,
'reqid': reqid, # remapped-to-int uid from ems
'pair': pair,
'price': str(msg.price),
'volume': str(msg.size),
'price': str(order.price),
'volume': str(order.size),
# only ensures request is valid, nothing more
# validate: 'true',
} | extra)
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
elif action == 'cancel':
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
msg = BrokerdCancel(**request_msg)
assert msg.oid in requests
reqid = ids[msg.oid]
case {
'account': 'kraken.spot',
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
@ -181,14 +164,27 @@ async def handle_order_requests(
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [msg.reqid], # should be txid from submission
'txid': [cancel.reqid], # should be txid from submission
})
else:
log.error(f'Unknown order command: {request_msg}')
case _:
account = msg.get('account')
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
# placehold for sanity checking in relay loop
requests[msg.oid] = msg
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
),
).dict()
)
@acm
@ -313,7 +309,11 @@ async def trades_dialogue(
) as ws,
trio.open_nursery() as n,
):
reqmsgs: dict[str, BrokerdOrder] = {}
# task local msg dialog tracking
emsflow: dict[
str,
list[MsgUnion],
] = {}
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
@ -325,7 +325,7 @@ async def trades_dialogue(
client,
ems_stream,
token,
reqmsgs,
emsflow,
ids,
)
@ -449,15 +449,24 @@ async def trades_dialogue(
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Orders update {seq}:{order_msgs}')
for order_msg in order_msgs:
log.info(
'Order msg update:\n'
'Order msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
}:
# we ignore internal order updates
# triggered by kraken's "edit"
# endpoint.
continue
case {
'status': status,
'userref': reqid,
@ -492,15 +501,20 @@ async def trades_dialogue(
'open': 'submitted',
'closed': 'cancelled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
oid = ids.inverse[reqid]
msgs = emsflow[oid]
# send BrokerdStatus messages for all
# order state updates
msg = BrokerdStatus(
resp = BrokerdStatus(
reqid=txid,
time_ns=time.time_ns(), # cuz why not
@ -521,7 +535,8 @@ async def trades_dialogue(
{'name': 'kraken'}, **update_msg
),
)
await ems_stream.send(msg.dict())
msgs.append(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(
@ -539,28 +554,29 @@ async def trades_dialogue(
and status == 'error'
):
log.error(
f'Failed to submit order {reqid}:\n'
f'Failed to submit/edit order {reqid}:\n'
f'{errmsg}'
)
oid = ids.inverse[reqid]
order = reqmsgs[oid]
await ems_stream.send(
BrokerdError(
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
# use old reqid in case it changed?
reqid=order.reqid,
symbol=order.symbol,
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed submit:\n{errmsg}',
broker_details=resp
).dict()
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# if we rx any error cancel the order again
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [order.reqid], # txid from submission
'txid': [last.reqid], # txid from submission
})
case {
@ -577,49 +593,48 @@ async def trades_dialogue(
# **rest,
}:
oid = ids.inverse[reqid]
order = reqmsgs[oid]
msgs = emsflow[oid]
last = msgs[-1]
log.info(
f'Submitting order {oid}[{reqid}]:\n'
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
f'{descr}'
)
# deliver ack immediately
await ems_stream.send(
BrokerdOrderAck(
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=order.account, # piker account
).dict()
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case {
'event': 'editOrderStatus',
'status': status,
'errorMessage': errmsg,
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
# **rest,
}:
log.info(
f'Editting order {oid}[{reqid}]:\n'
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side
# `.reqid`.
# deliver another ack to update the ems-side `.reqid`.
oid = ids.inverse[reqid]
await ems_stream.send(
BrokerdOrderAck(
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=order.account, # piker account
).dict()
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# successful cancellation
case {
@ -631,19 +646,20 @@ async def trades_dialogue(
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
oid = ids.inverse[reqid]
msg = reqmsgs[oid]
msgs = emsflow[oid]
last = msgs[-1]
for txid in txids:
await ems_stream.send(
BrokerdStatus(
resp = BrokerdStatus(
reqid=txid,
account=msg.account,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=resp,
).dict()
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# failed cancel
case {
@ -653,17 +669,18 @@ async def trades_dialogue(
'reqid': reqid,
}:
oid = ids.inverse[reqid]
msg = reqmsgs[oid]
msgs = emsflow[oid]
last = msgs[-1]
await ems_stream.send(
BrokerdError(
resp = BrokerdError(
oid=oid,
reqid=msg.reqid,
symbol=msg.symbol,
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed order cancel {errmsg}',
broker_details=resp
).dict()
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(f'Unhandled trades msg: {msg}')