Fixes for state updates and clears

Turns out the `openOrders` and `ownTrades` subs always return a `reqid`
value (the one brokerd sends to the kraken api in order requests) is
always set to zero, which seems to be a bug? So this includes patches to
work around that as well reliance on the `openOrders` sub to do most
`BrokerdStatus` updates since `XOrderStatus` events don't seem to have
much data in them at all (they almost look like pure ack events so maybe
they aren't affirmative of final state changes anyway..).

Other fixes:
- respond with a `BrokerdOrderAck` immediately after `requid` generation
  not after order submission to ensure the ems has a valid `requid`
  *before* kraken api events are relayed through.
- add a `reqids2txids: bidict[int, str]` which maps brokerd genned
  `requid`s to kraken-side `txid`s since (as mentioned above) the
  clearing and state endpoints don't relay back this value (it's always
  0...)
- add log messages for each sub so that (at least for now) we can see
  exact msg contents coming from kraken.
- drop `.remaining` calcs for now since we need to keep record of the
  order states manually in order to retreive the original submission
  vlm..
- fix the `openOrders` case for fills, in this case the message includes
  no `status` field and thus we must catch it in a block *after* the
  normal state handler to avoid masking.
- drop response msg generation from the cancel status case since we
  can do it again from the `openOrders` handler and sending a double
  status causes issues on the client side.
- add a shite ton of notes around all this missing `requid` stuff.
kraken_ws_orders
Tyler Goodlet 2022-07-10 16:16:23 -04:00
parent 5dc9a61ec4
commit 57f2478dc7
1 changed files with 124 additions and 49 deletions

View File

@ -102,7 +102,7 @@ async def handle_order_requests(
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
# last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
@ -148,6 +148,16 @@ async def handle_order_requests(
psym = order.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
# XXX: ACK the request **immediately** before sending
# the api side request to ensure the ems maps the oid ->
# reqid correctly!
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=reqid, # our custom int mapping
account=account, # piker account
)
await ems_order_stream.send(resp)
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
req = {
@ -166,13 +176,6 @@ async def handle_order_requests(
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=reqid, # our custom int mapping
account=account, # piker account
)
await ems_order_stream.send(resp)
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
@ -327,7 +330,7 @@ async def trades_dialogue(
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
reqids2txids: bidict[int, str] = bidict()
# task for processing inbound requests from ems
n.start_soon(
@ -362,7 +365,7 @@ async def handle_order_updates(
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
reqids2txids: bidict[int, str],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
@ -384,13 +387,22 @@ async def handle_order_updates(
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
# TODO: turns out you get the fill events from the
# `openOrders` before you get this, so it might be better
# to do all fill/status/pp updates in that sub and just use
# this one for ledger syncs?
# For eg. we could take the "last 50 trades" and do a diff
# with the ledger and then only do a re-sync if something
# seems amiss?
case [
trades_msgs,
'ownTrades',
# won't exist for historical values?
# 'userref': reqid,
{'sequence': seq},
]:
log.info(
f'ownTrades update_{seq}:\n'
f'{pformat(trades_msgs)}'
)
# flatten msgs to an {id -> data} table for processing
trades = {
tid: trade
@ -402,11 +414,13 @@ async def handle_order_updates(
}
for tid, trade in trades.items():
# NOTE: try to get the requid sent in the order
# request message if posssible; it may not be
# provided since this sub also returns generic
# historical trade events.
reqid = trade.get('userref', trade['ordertxid'])
txid = trade['ordertxid']
# NOTE: yet again, here we don't have any ref to the
# reqid that's generated by us (as the client) and
# sent in the order request, so we have to look it
# up from our own registry...
reqid = reqids2txids.inverse[txid]
action = trade['type']
price = float(trade['price'])
@ -415,16 +429,16 @@ async def handle_order_updates(
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
reqid=reqid,
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_details={'name': 'kraken'} | trade,
broker_time=broker_time
)
await ems_stream.send(fill_msg)
@ -455,6 +469,10 @@ async def handle_order_updates(
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
# TODO: ideally we can pass in an existingn
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time..
active, closed = pp.update_pps_conf(
'kraken',
acctid,
@ -499,7 +517,7 @@ async def handle_order_updates(
]:
for order_msg in order_msgs:
log.info(
f'Order msg update_{seq}:\n'
f'`openOrders` msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
@ -510,14 +528,22 @@ async def handle_order_updates(
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
# 'userref': reqid, # XXX: always zero bug XD
# **rest,
}:
log.info(
f'Order {txid} was replaced'
)
continue
case {
# XXX: lol, ws bug, this is always 0!
'userref': _,
# during a fill this field is **not**
# provided! but, it is always avail on
# actual status updates.. see case above.
'status': status,
'userref': reqid,
**rest,
# XXX: eg. of remaining msg schema:
@ -548,17 +574,23 @@ async def handle_order_updates(
}:
ems_status = {
'open': 'submitted',
'closed': 'cancelled',
'closed': 'filled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
# TODO: store this in a ChainMap instance
# per order dialog.
# submit_vlm = rest.get('vol', 0)
# fee = rest.get('fee', 0)
if status == 'closed':
vlm = 0
else:
vlm = rest.get('vol_exec', 0)
reqids2txids[reqid] = txid
reqid = reqids2txids.inverse[txid]
oid = ids.inverse.get(reqid)
if not oid:
@ -594,13 +626,17 @@ async def handle_order_updates(
# everyone doin camel case..
status=ems_status, # force lower case
filled=exec_vlm,
filled=vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
remaining=vlm,
# TODO: need to extract the submit vlm
# from a prior msg update..
# (
# float(submit_vlm)
# -
# float(exec_vlm)
# ),
broker_details=dict(
{'name': 'kraken'}, **update_msg
@ -609,12 +645,49 @@ async def handle_order_updates(
msgs.append(resp)
await ems_stream.send(resp)
# fill event.
# there is no `status` field
case {
'vol_exec': vlm,
**rest,
}:
# eg. fill msg contents (in total):
# {
# 'vol_exec': '0.84709869',
# 'cost': '101.25370642',
# 'fee': '0.26325964',
# 'avg_price': '119.53000001',
# 'userref': 0,
# }
# TODO: emit fill msg from here
reqid = reqids2txids.inverse[txid]
log.info(
f'openOrders vlm={vlm} Fill for {reqid}:\n'
f'{update_msg}'
)
continue
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
# TODO: given the 'openOrders' sub , pretty
# much all the msgs we get for this sub are duplicate
# of the (incremental) updates in that one though we still
# need them because that sub seems to have a bug where the
# `userref` field is always 0 instead of our generated reqid
# value...
# Not sure why kraken devs decided to repeat themselves but
# it almost seems as though we could drop this entire sub
# and get everything we need by just parsing msgs correctly
# above? The only reason for this seems to be remapping
# underlying `txid` values on order "edits" which the
# `openOrders` sub doesn't seem to have any knowledge of.
# I'd also like to ask them which event guarantees that the
# the live order is now in the book, since these status ones
# almost seem more like request-acks then state guarantees.
case {
'event': etype,
'status': status,
@ -627,7 +700,13 @@ async def handle_order_updates(
'cancelOrderStatus',
}
):
log.info(
f'{etype}:\n'
f'{pformat(msg)}'
)
oid = ids.inverse.get(reqid)
# TODO: relay these to EMS once it supports
# open order loading.
if not oid:
log.warning(
'Unknown order status update?:\n'
@ -637,6 +716,10 @@ async def handle_order_updates(
txid = rest.get('txid')
if txid:
# XXX: we **must** do this mapping for edit order
# status updates since the `openOrders` sub above
# never relays back the correct client-side `reqid`
# that is put in the order request..
reqids2txids[reqid] = txid
msgs = emsflow[oid]
@ -703,9 +786,9 @@ def process_status(
'descr': descr, # only on success?
}:
log.info(
f'Submitting order: {descr}\n'
f'Submitted order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'brokerd reqid: {reqid}\n'
f'txid: {txid}\n'
)
return [], False
@ -722,6 +805,7 @@ def process_status(
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
@ -737,21 +821,12 @@ def process_status(
# 'txid': txids,
**rest,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
resps: list[MsgUnion] = []
for txid in rest.get('txid', [last.reqid]):
resp = BrokerdStatus(
reqid=reqid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=event,
log.info(
f'Cancelling order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n'
)
resps.append(resp)
return resps, False
return [], False
def norm_trade_records(