First draft, working WS based order management

Move to using the websocket API for all order control ops and dropping
the sync rest api approach which resulted in a bunch of buggy races.
Further this gets us must faster (batch) order cancellation for free
and a simpler ems request handler loop. We now heavily leverage the new
py3.10 `match:` syntax for all kraken-side API msg parsing and
processing and handle both the `openOrders` and `ownTrades` subscription
streams.

We also block "order editing" (by immediate cancellation) for now since
the EMS isn't entirely yet equipped to handle brokerd side `.reqid`
changes (which is how kraken implements so called order "updates" or
"edits") for a given order-request dialog and we may want to even
consider just implementing "updates" ourselves via independent cancel
and submit requests? Definitely something to ponder. Alternatively we
can "masquerade" such updates behind the count-style `.oid` remapping we
had to implement anyway (kraken's limitation) and maybe everything will
just work?

Further details in this patch:
- create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s
  that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable
  local lookup of ems uids to piker-backend-client-side request ids and
  received order messages.
- add `openOrders` sub support which more or less directly relays to
  equivalent `BrokerdStatus` updates and calc the `.filled` and
  `.remaining` values based on cleared vlm updates.
- add handler blocks for `[add/edit/cancel]OrderStatus` events including
  error msg cases.
- don't do any order request response processing in
  `handle_order_requests()` since responses are always received via one
  (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus
  such msgs are now handled in the response relay loop.

Relates to #290
Resolves #310, #296
krakenwsbackup
Tyler Goodlet 2022-07-04 22:00:56 -04:00
parent f5236f658b
commit 4823f87422
2 changed files with 324 additions and 142 deletions

View File

@ -282,6 +282,7 @@ class Client:
"volume": str(size),
}
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid

View File

@ -20,7 +20,7 @@ Order api and machinery
'''
from contextlib import asynccontextmanager as acm
from functools import partial
from itertools import chain
from itertools import chain, count
from pprint import pformat
import time
from typing import (
@ -31,8 +31,9 @@ from typing import (
# Union,
)
from bidict import bidict
import pendulum
from pydantic import BaseModel
# from pydantic import BaseModel
import trio
import tractor
import wsproto
@ -62,27 +63,25 @@ from .feed import (
)
class Trade(BaseModel):
'''
Trade class that helps parse and validate ownTrades stream
'''
reqid: str # kraken order transaction id
action: str # buy or sell
price: float # price of asset
size: float # vol of asset
broker_time: str # e.g GTC, GTD
async def handle_order_requests(
ws: NoBsWs,
client: Client,
ems_order_stream: tractor.MsgStream,
token: str,
requests: dict[str, BrokerdOrder],
ids: bidict[str, int],
) -> None:
'''
Process new order submission requests from the EMS
and deliver acks or errors.
'''
# XXX: UGH, let's unify this.. with ``msgspec``.
request_msg: dict
order: BrokerdOrder
counter = count()
async for request_msg in ems_order_stream:
log.info(
@ -90,151 +89,107 @@ async def handle_order_requests(
f'{pformat(request_msg)}'
)
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
await ems_order_stream.send(
BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# reason=f'Kraken only, No account found: `{account}` ?',
reason=(
'Kraken only, order mode disabled due to '
'https://github.com/pikers/piker/issues/299'
),
).dict())
).dict()
)
continue
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
reqid=order.reqid,
)
msg = BrokerdOrder(**request_msg)
err = resp['error']
if err:
oid = order.oid
log.error(f'Failed to submit order: {oid}')
# logic from old `Client.submit_limit()`
if msg.oid in ids:
ep = 'editOrder'
reqid = ids[msg.oid] # integer not txid
order = requests[msg.oid]
assert order.oid == msg.oid
extra = {
'orderid': msg.reqid, # txid
}
# XXX: TODO: get this working, but currently the EMS
# doesn't support changing order `.reqid` (in this case
# kraken changes them via a cancel and a new
# submission). So for now cancel and report the error.
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [msg.reqid], # should be txid from submission
})
continue
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=order.reqid,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
# TODO: handle multiple orders (cancels?)
# txid is an array of strings
if order.reqid is None:
reqid = resp['result']['txid'][0]
else:
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
# account the made the order
account=order.account
).dict()
ep = 'addOrder'
reqid = next(counter)
ids[msg.oid] = reqid
log.debug(
f"GENERATED ORDER {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': msg.action,
}
psym = msg.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
await ws.send_msg({
'event': ep,
'token': token,
'reqid': reqid, # remapped-to-int uid from ems
'pair': pair,
'price': str(msg.price),
'volume': str(msg.size),
# only ensures request is valid, nothing more
# validate: 'true',
} | extra)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
assert msg.oid in requests
reqid = ids[msg.oid]
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [msg.reqid], # should be txid from submission
})
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled.
try:
result = resp['result']
count = result['count']
# check for 'error' key if we received no 'result'
except KeyError:
error = resp.get('error')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
if not error:
raise BrokerError(f'Unknown order cancel response: {resp}')
else:
if not count: # no orders were cancelled?
# XXX: what exactly is this from and why would we care?
# there doesn't seem to be any docs here?
# https://docs.kraken.com/rest/#operation/cancelOrder
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = result.get('pending')
if pending:
log.error(f'Order {oid} cancel was not yet successful')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
else: # order cancel success case.
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
else:
log.error(f'Unknown order command: {request_msg}')
# placehold for sanity checking in relay loop
requests[msg.oid] = msg
@acm
async def subscribe(
@ -358,8 +313,21 @@ async def trades_dialogue(
) as ws,
trio.open_nursery() as n,
):
reqmsgs: dict[str, BrokerdOrder] = {}
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)
n.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
reqmsgs,
ids,
)
count: int = 0
@ -472,7 +440,7 @@ async def trades_dialogue(
await ems_stream.send(pp_msg.dict())
case [
trades_msgs,
order_msgs,
'openOrders',
{'sequence': seq},
]:
@ -481,11 +449,224 @@ async def trades_dialogue(
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Order update {seq}:{trades_msgs}')
log.info(f'Orders update {seq}:{order_msgs}')
for order_msg in order_msgs:
log.info(
'Order msg update:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
case {
'status': status,
'userref': reqid,
**rest,
# 'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000
}:
ems_status = {
'open': 'submitted',
'closed': 'cancelled',
'canceled': 'cancelled',
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
# send BrokerdStatus messages for all
# order state updates
msg = BrokerdStatus(
reqid=txid,
time_ns=time.time_ns(), # cuz why not
account=f'kraken.{acctid}',
# everyone doin camel case..
status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
)
await ems_stream.send(msg.dict())
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
case {
'event': etype,
'status': status,
'errorMessage': errmsg,
'reqid': reqid,
} if (
etype in {'addOrderStatus', 'editOrderStatus'}
and status == 'error'
):
log.error(
f'Failed to submit order {reqid}:\n'
f'{errmsg}'
)
oid = ids.inverse[reqid]
order = reqmsgs[oid]
await ems_stream.send(
BrokerdError(
oid=oid,
# use old reqid in case it changed?
reqid=order.reqid,
symbol=order.symbol,
reason=f'Failed submit:\n{errmsg}',
broker_details=resp
).dict()
)
# if we rx any error cancel the order again
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [order.reqid], # txid from submission
})
case {
'event': 'addOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
# NOTE: in the case of an edit request this is
# a new value!
'txid': txid,
'descr': descr, # only on success?
# 'originaltxid': txid, # only on edits
# **rest,
}:
oid = ids.inverse[reqid]
order = reqmsgs[oid]
log.info(
f'Submitting order {oid}[{reqid}]:\n'
f'txid: {txid}\n'
f'{descr}'
)
# deliver ack immediately
await ems_stream.send(
BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=order.account, # piker account
).dict()
)
case {
'event': 'editOrderStatus',
'status': status,
'errorMessage': errmsg,
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
# **rest,
}:
log.info(
f'Editting order {oid}[{reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side
# `.reqid`.
oid = ids.inverse[reqid]
await ems_stream.send(
BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=order.account, # piker account
).dict()
)
# successful cancellation
case {
"event": "cancelOrderStatus",
"status": "ok",
'txid': txids,
'reqid': reqid,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
oid = ids.inverse[reqid]
msg = reqmsgs[oid]
for txid in txids:
await ems_stream.send(
BrokerdStatus(
reqid=txid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=resp,
).dict()
)
# failed cancel
case {
"event": "cancelOrderStatus",
"status": "error",
"errorMessage": errmsg,
'reqid': reqid,
}:
oid = ids.inverse[reqid]
msg = reqmsgs[oid]
await ems_stream.send(
BrokerdError(
oid=oid,
reqid=msg.reqid,
symbol=msg.symbol,
reason=f'Failed order cancel {errmsg}',
broker_details=resp
).dict()
)
case _:
log.warning(f'Unhandled trades msg: {msg}')
await tractor.breakpoint()
def norm_trade_records(