Get order "editing" working fully
Turns out the EMS can support this as originally expected: you can update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems which update its cross-dialog (leg) tracking correctly! The issue was a bug in the `editOrderStatus` msg handling and appropriate tracking of the correct `.oid` (ems uid) on the kraken side. This unfortunately required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow tracing table which means the broker daemon is tracking all the msg flow with the ems, though I'm wondering now if this is just good practise anyway and maybe we should offer a small primitive type from our msging utils to aid with this? I've used such constructs in event handling systems prior. There's a lot more factoring that can be done after these changes as well but the quick detailed summary is, - rework the `handle_order_requests()` loop to use `match:` syntax and update the new `emsflow` table on every new request from the ems. - fix the `editOrderStatus` case pattern to not include an error msg and thus actually be triggered to respond to the ems with a `BrokerdAck` containing the new `.reqid`, the new kraken side `txid`. - skip any `openOrders` msgs which are detected as being kraken's internal order "edits" by matching on the `cancel_reason` field. - update the `emsflow` table in all ws-stream msg handling blocks with responses sent to the ems. Relates to #290tractor_typed_msg_hackin
parent
d41d140265
commit
204d3b7214
|
@ -26,9 +26,8 @@ import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
# Callable,
|
|
||||||
# Optional,
|
# Optional,
|
||||||
# Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
|
@ -62,6 +61,16 @@ from .feed import (
|
||||||
stream_messages,
|
stream_messages,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
MsgUnion = Union[
|
||||||
|
BrokerdCancel,
|
||||||
|
BrokerdError,
|
||||||
|
BrokerdFill,
|
||||||
|
BrokerdOrder,
|
||||||
|
BrokerdOrderAck,
|
||||||
|
BrokerdPosition,
|
||||||
|
BrokerdStatus,
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
async def handle_order_requests(
|
async def handle_order_requests(
|
||||||
|
|
||||||
|
@ -69,7 +78,7 @@ async def handle_order_requests(
|
||||||
client: Client,
|
client: Client,
|
||||||
ems_order_stream: tractor.MsgStream,
|
ems_order_stream: tractor.MsgStream,
|
||||||
token: str,
|
token: str,
|
||||||
requests: dict[str, BrokerdOrder],
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -79,101 +88,75 @@ async def handle_order_requests(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# XXX: UGH, let's unify this.. with ``msgspec``.
|
# XXX: UGH, let's unify this.. with ``msgspec``.
|
||||||
request_msg: dict
|
msg: dict[str, Any]
|
||||||
order: BrokerdOrder
|
order: BrokerdOrder
|
||||||
counter = count()
|
counter = count()
|
||||||
|
|
||||||
async for request_msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
'Received order request:\n'
|
match msg:
|
||||||
f'{pformat(request_msg)}'
|
case {
|
||||||
)
|
'account': 'kraken.spot',
|
||||||
|
'action': action,
|
||||||
account = request_msg['account']
|
} if action in {'buy', 'sell'}:
|
||||||
|
|
||||||
if account != 'kraken.spot':
|
|
||||||
log.error(
|
|
||||||
'This is a kraken account, \
|
|
||||||
only a `kraken.spot` selection is valid'
|
|
||||||
)
|
|
||||||
await ems_order_stream.send(
|
|
||||||
BrokerdError(
|
|
||||||
oid=request_msg['oid'],
|
|
||||||
symbol=request_msg['symbol'],
|
|
||||||
reason=(
|
|
||||||
'Kraken only, order mode disabled due to '
|
|
||||||
'https://github.com/pikers/piker/issues/299'
|
|
||||||
),
|
|
||||||
|
|
||||||
).dict()
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
action = request_msg['action']
|
|
||||||
if action in {'buy', 'sell'}:
|
|
||||||
|
|
||||||
# validate
|
# validate
|
||||||
msg = BrokerdOrder(**request_msg)
|
order = BrokerdOrder(**msg)
|
||||||
|
|
||||||
# logic from old `Client.submit_limit()`
|
# logic from old `Client.submit_limit()`
|
||||||
if msg.oid in ids:
|
if order.oid in ids:
|
||||||
ep = 'editOrder'
|
ep = 'editOrder'
|
||||||
reqid = ids[msg.oid] # integer not txid
|
reqid = ids[order.oid] # integer not txid
|
||||||
order = requests[msg.oid]
|
last = emsflow[order.oid][-1]
|
||||||
assert order.oid == msg.oid
|
assert last.reqid == order.reqid
|
||||||
extra = {
|
extra = {
|
||||||
'orderid': msg.reqid, # txid
|
'orderid': last.reqid, # txid
|
||||||
}
|
}
|
||||||
|
|
||||||
# XXX: TODO: get this working, but currently the EMS
|
|
||||||
# doesn't support changing order `.reqid` (in this case
|
|
||||||
# kraken changes them via a cancel and a new
|
|
||||||
# submission). So for now cancel and report the error.
|
|
||||||
await ws.send_msg({
|
|
||||||
'event': 'cancelOrder',
|
|
||||||
'token': token,
|
|
||||||
'reqid': reqid,
|
|
||||||
'txid': [msg.reqid], # should be txid from submission
|
|
||||||
})
|
|
||||||
continue
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
ep = 'addOrder'
|
ep = 'addOrder'
|
||||||
reqid = next(counter)
|
reqid = next(counter)
|
||||||
ids[msg.oid] = reqid
|
ids[order.oid] = reqid
|
||||||
log.debug(
|
log.debug(
|
||||||
f"GENERATED ORDER {reqid}\n"
|
f"GENERATED ORDER {reqid}\n"
|
||||||
f'{ids}'
|
f'{ids}'
|
||||||
)
|
)
|
||||||
extra = {
|
extra = {
|
||||||
'ordertype': 'limit',
|
'ordertype': 'limit',
|
||||||
'type': msg.action,
|
'type': order.action,
|
||||||
}
|
}
|
||||||
|
|
||||||
psym = msg.symbol.upper()
|
psym = order.symbol.upper()
|
||||||
pair = f'{psym[:3]}/{psym[3:]}'
|
pair = f'{psym[:3]}/{psym[3:]}'
|
||||||
|
|
||||||
# call ws api to submit the order:
|
# call ws api to submit the order:
|
||||||
# https://docs.kraken.com/websockets/#message-addOrder
|
# https://docs.kraken.com/websockets/#message-addOrder
|
||||||
await ws.send_msg({
|
req = {
|
||||||
'event': ep,
|
'event': ep,
|
||||||
'token': token,
|
'token': token,
|
||||||
|
|
||||||
'reqid': reqid, # remapped-to-int uid from ems
|
'reqid': reqid, # remapped-to-int uid from ems
|
||||||
'pair': pair,
|
'pair': pair,
|
||||||
'price': str(msg.price),
|
'price': str(order.price),
|
||||||
'volume': str(msg.size),
|
'volume': str(order.size),
|
||||||
|
|
||||||
# only ensures request is valid, nothing more
|
# only ensures request is valid, nothing more
|
||||||
# validate: 'true',
|
# validate: 'true',
|
||||||
|
|
||||||
} | extra)
|
} | extra
|
||||||
|
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
||||||
|
await ws.send_msg(req)
|
||||||
|
|
||||||
elif action == 'cancel':
|
# placehold for sanity checking in relay loop
|
||||||
|
emsflow.setdefault(order.oid, []).append(order)
|
||||||
|
|
||||||
msg = BrokerdCancel(**request_msg)
|
case {
|
||||||
assert msg.oid in requests
|
'account': 'kraken.spot',
|
||||||
reqid = ids[msg.oid]
|
'action': 'cancel',
|
||||||
|
}:
|
||||||
|
cancel = BrokerdCancel(**msg)
|
||||||
|
assert cancel.oid in emsflow
|
||||||
|
reqid = ids[cancel.oid]
|
||||||
|
|
||||||
# call ws api to cancel:
|
# call ws api to cancel:
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
@ -181,14 +164,27 @@ async def handle_order_requests(
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': token,
|
'token': token,
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
'txid': [msg.reqid], # should be txid from submission
|
'txid': [cancel.reqid], # should be txid from submission
|
||||||
})
|
})
|
||||||
|
|
||||||
else:
|
case _:
|
||||||
log.error(f'Unknown order command: {request_msg}')
|
account = msg.get('account')
|
||||||
|
if account != 'kraken.spot':
|
||||||
|
log.error(
|
||||||
|
'This is a kraken account, \
|
||||||
|
only a `kraken.spot` selection is valid'
|
||||||
|
)
|
||||||
|
|
||||||
# placehold for sanity checking in relay loop
|
await ems_order_stream.send(
|
||||||
requests[msg.oid] = msg
|
BrokerdError(
|
||||||
|
oid=msg['oid'],
|
||||||
|
symbol=msg['symbol'],
|
||||||
|
reason=(
|
||||||
|
'Invalid request msg:\n{msg}'
|
||||||
|
),
|
||||||
|
|
||||||
|
).dict()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -313,7 +309,11 @@ async def trades_dialogue(
|
||||||
) as ws,
|
) as ws,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
reqmsgs: dict[str, BrokerdOrder] = {}
|
# task local msg dialog tracking
|
||||||
|
emsflow: dict[
|
||||||
|
str,
|
||||||
|
list[MsgUnion],
|
||||||
|
] = {}
|
||||||
|
|
||||||
# 2way map for ems ids to kraken int reqids..
|
# 2way map for ems ids to kraken int reqids..
|
||||||
ids: bidict[str, int] = bidict()
|
ids: bidict[str, int] = bidict()
|
||||||
|
@ -325,7 +325,7 @@ async def trades_dialogue(
|
||||||
client,
|
client,
|
||||||
ems_stream,
|
ems_stream,
|
||||||
token,
|
token,
|
||||||
reqmsgs,
|
emsflow,
|
||||||
ids,
|
ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -449,15 +449,24 @@ async def trades_dialogue(
|
||||||
# above:
|
# above:
|
||||||
# https://github.com/pikers/piker/issues/293
|
# https://github.com/pikers/piker/issues/293
|
||||||
# https://github.com/pikers/piker/issues/310
|
# https://github.com/pikers/piker/issues/310
|
||||||
log.info(f'Orders update {seq}:{order_msgs}')
|
|
||||||
|
|
||||||
for order_msg in order_msgs:
|
for order_msg in order_msgs:
|
||||||
log.info(
|
log.info(
|
||||||
'Order msg update:\n'
|
'Order msg update_{seq}:\n'
|
||||||
f'{pformat(order_msg)}'
|
f'{pformat(order_msg)}'
|
||||||
)
|
)
|
||||||
txid, update_msg = list(order_msg.items())[0]
|
txid, update_msg = list(order_msg.items())[0]
|
||||||
match update_msg:
|
match update_msg:
|
||||||
|
case {
|
||||||
|
'cancel_reason': 'Order replaced',
|
||||||
|
'status': status,
|
||||||
|
'userref': reqid,
|
||||||
|
**rest,
|
||||||
|
}:
|
||||||
|
# we ignore internal order updates
|
||||||
|
# triggered by kraken's "edit"
|
||||||
|
# endpoint.
|
||||||
|
continue
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'status': status,
|
'status': status,
|
||||||
'userref': reqid,
|
'userref': reqid,
|
||||||
|
@ -492,15 +501,20 @@ async def trades_dialogue(
|
||||||
'open': 'submitted',
|
'open': 'submitted',
|
||||||
'closed': 'cancelled',
|
'closed': 'cancelled',
|
||||||
'canceled': 'cancelled',
|
'canceled': 'cancelled',
|
||||||
|
# do we even need to forward
|
||||||
|
# this state to the ems?
|
||||||
'pending': 'pending',
|
'pending': 'pending',
|
||||||
}[status]
|
}[status]
|
||||||
|
|
||||||
submit_vlm = rest.get('vol', 0)
|
submit_vlm = rest.get('vol', 0)
|
||||||
exec_vlm = rest.get('vol_exec', 0)
|
exec_vlm = rest.get('vol_exec', 0)
|
||||||
|
|
||||||
|
oid = ids.inverse[reqid]
|
||||||
|
msgs = emsflow[oid]
|
||||||
|
|
||||||
# send BrokerdStatus messages for all
|
# send BrokerdStatus messages for all
|
||||||
# order state updates
|
# order state updates
|
||||||
msg = BrokerdStatus(
|
resp = BrokerdStatus(
|
||||||
|
|
||||||
reqid=txid,
|
reqid=txid,
|
||||||
time_ns=time.time_ns(), # cuz why not
|
time_ns=time.time_ns(), # cuz why not
|
||||||
|
@ -521,7 +535,8 @@ async def trades_dialogue(
|
||||||
{'name': 'kraken'}, **update_msg
|
{'name': 'kraken'}, **update_msg
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
await ems_stream.send(msg.dict())
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp.dict())
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -539,28 +554,29 @@ async def trades_dialogue(
|
||||||
and status == 'error'
|
and status == 'error'
|
||||||
):
|
):
|
||||||
log.error(
|
log.error(
|
||||||
f'Failed to submit order {reqid}:\n'
|
f'Failed to submit/edit order {reqid}:\n'
|
||||||
f'{errmsg}'
|
f'{errmsg}'
|
||||||
)
|
)
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse[reqid]
|
||||||
order = reqmsgs[oid]
|
msgs = emsflow[oid]
|
||||||
await ems_stream.send(
|
last = msgs[-1]
|
||||||
BrokerdError(
|
resp = BrokerdError(
|
||||||
oid=oid,
|
oid=oid,
|
||||||
# use old reqid in case it changed?
|
# use old reqid in case it changed?
|
||||||
reqid=order.reqid,
|
reqid=last.reqid,
|
||||||
symbol=order.symbol,
|
symbol=last.symbol,
|
||||||
reason=f'Failed submit:\n{errmsg}',
|
reason=f'Failed submit:\n{errmsg}',
|
||||||
broker_details=resp
|
broker_details=resp
|
||||||
).dict()
|
|
||||||
)
|
)
|
||||||
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp.dict())
|
||||||
|
|
||||||
# if we rx any error cancel the order again
|
# if we rx any error cancel the order again
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': token,
|
'token': token,
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
'txid': [order.reqid], # txid from submission
|
'txid': [last.reqid], # txid from submission
|
||||||
})
|
})
|
||||||
|
|
||||||
case {
|
case {
|
||||||
|
@ -577,49 +593,48 @@ async def trades_dialogue(
|
||||||
# **rest,
|
# **rest,
|
||||||
}:
|
}:
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse[reqid]
|
||||||
order = reqmsgs[oid]
|
msgs = emsflow[oid]
|
||||||
|
last = msgs[-1]
|
||||||
log.info(
|
log.info(
|
||||||
f'Submitting order {oid}[{reqid}]:\n'
|
f'Submitting order: {descr}\n'
|
||||||
|
f'ems oid: {oid}\n'
|
||||||
|
f're-mapped reqid: {reqid}\n'
|
||||||
f'txid: {txid}\n'
|
f'txid: {txid}\n'
|
||||||
f'{descr}'
|
|
||||||
)
|
)
|
||||||
|
resp = BrokerdOrderAck(
|
||||||
# deliver ack immediately
|
|
||||||
await ems_stream.send(
|
|
||||||
BrokerdOrderAck(
|
|
||||||
oid=oid, # ems order request id
|
oid=oid, # ems order request id
|
||||||
reqid=txid, # kraken unique order id
|
reqid=txid, # kraken unique order id
|
||||||
account=order.account, # piker account
|
account=last.account, # piker account
|
||||||
).dict()
|
|
||||||
)
|
)
|
||||||
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp.dict())
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'event': 'editOrderStatus',
|
'event': 'editOrderStatus',
|
||||||
'status': status,
|
'status': status,
|
||||||
'errorMessage': errmsg,
|
|
||||||
'reqid': reqid, # oid from ems side
|
'reqid': reqid, # oid from ems side
|
||||||
'descr': descr,
|
'descr': descr,
|
||||||
|
|
||||||
# NOTE: for edit request this is a new value
|
# NOTE: for edit request this is a new value
|
||||||
'txid': txid,
|
'txid': txid,
|
||||||
'originaltxid': origtxid,
|
'originaltxid': origtxid,
|
||||||
# **rest,
|
|
||||||
}:
|
}:
|
||||||
log.info(
|
log.info(
|
||||||
f'Editting order {oid}[{reqid}]:\n'
|
f'Editting order {oid}[requid={reqid}]:\n'
|
||||||
f'txid: {origtxid} -> {txid}\n'
|
f'txid: {origtxid} -> {txid}\n'
|
||||||
f'{descr}'
|
f'{descr}'
|
||||||
)
|
)
|
||||||
# deliver another ack to update the ems-side
|
# deliver another ack to update the ems-side `.reqid`.
|
||||||
# `.reqid`.
|
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse[reqid]
|
||||||
await ems_stream.send(
|
msgs = emsflow[oid]
|
||||||
BrokerdOrderAck(
|
last = msgs[-1]
|
||||||
|
resp = BrokerdOrderAck(
|
||||||
oid=oid, # ems order request id
|
oid=oid, # ems order request id
|
||||||
reqid=txid, # kraken unique order id
|
reqid=txid, # kraken unique order id
|
||||||
account=order.account, # piker account
|
account=last.account, # piker account
|
||||||
).dict()
|
|
||||||
)
|
)
|
||||||
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp.dict())
|
||||||
|
|
||||||
# successful cancellation
|
# successful cancellation
|
||||||
case {
|
case {
|
||||||
|
@ -631,19 +646,20 @@ async def trades_dialogue(
|
||||||
# TODO: should we support "batch" acking of
|
# TODO: should we support "batch" acking of
|
||||||
# multiple cancels thus avoiding the below loop?
|
# multiple cancels thus avoiding the below loop?
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse[reqid]
|
||||||
msg = reqmsgs[oid]
|
msgs = emsflow[oid]
|
||||||
|
last = msgs[-1]
|
||||||
|
|
||||||
for txid in txids:
|
for txid in txids:
|
||||||
await ems_stream.send(
|
resp = BrokerdStatus(
|
||||||
BrokerdStatus(
|
|
||||||
reqid=txid,
|
reqid=txid,
|
||||||
account=msg.account,
|
account=last.account,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
status='cancelled',
|
status='cancelled',
|
||||||
reason='Cancel success: {oid}@{txid}',
|
reason='Cancel success: {oid}@{txid}',
|
||||||
broker_details=resp,
|
broker_details=resp,
|
||||||
).dict()
|
|
||||||
)
|
)
|
||||||
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp.dict())
|
||||||
|
|
||||||
# failed cancel
|
# failed cancel
|
||||||
case {
|
case {
|
||||||
|
@ -653,17 +669,18 @@ async def trades_dialogue(
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
}:
|
}:
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse[reqid]
|
||||||
msg = reqmsgs[oid]
|
msgs = emsflow[oid]
|
||||||
|
last = msgs[-1]
|
||||||
|
|
||||||
await ems_stream.send(
|
resp = BrokerdError(
|
||||||
BrokerdError(
|
|
||||||
oid=oid,
|
oid=oid,
|
||||||
reqid=msg.reqid,
|
reqid=last.reqid,
|
||||||
symbol=msg.symbol,
|
symbol=last.symbol,
|
||||||
reason=f'Failed order cancel {errmsg}',
|
reason=f'Failed order cancel {errmsg}',
|
||||||
broker_details=resp
|
broker_details=resp
|
||||||
).dict()
|
|
||||||
)
|
)
|
||||||
|
msgs.append(resp)
|
||||||
|
await ems_stream.send(resp.dict())
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled trades msg: {msg}')
|
log.warning(f'Unhandled trades msg: {msg}')
|
||||||
|
|
Loading…
Reference in New Issue