Use both `reqid` and `userref` in order requests

Turns out you can pass both thus making mapping an ems `oid` to
a brokerd-side `reqid` much more simple. This allows us to avoid keeping
as much local dialog state but with still the following caveats:

- ok `editOrder` msgs must update the reqid<->txid map
- only pop `reqids2txids` entries inside the `cancelOrderStatus` handler
kraken_userref_hackzin
Tyler Goodlet 2022-07-30 16:32:03 -04:00
parent dc8072c6db
commit 227a80469e
1 changed files with 90 additions and 48 deletions

View File

@ -31,7 +31,6 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Optional,
Union, Union,
) )
@ -105,7 +104,7 @@ async def handle_order_requests(
# XXX: UGH, let's unify this.. with ``msgspec``. # XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any] msg: dict[str, Any]
order: BrokerdOrder order: BrokerdOrder
counter = count() counter = count(1)
async for msg in ems_order_stream: async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}') log.info(f'Rx order msg:\n{pformat(msg)}')
@ -139,7 +138,8 @@ async def handle_order_requests(
ep = 'editOrder' ep = 'editOrder'
reqid = ids[order.oid] # integer not txid reqid = ids[order.oid] # integer not txid
try: try:
txid = reqids2txids.pop(reqid) # txid = reqids2txids.pop(reqid)
txid = reqids2txids[reqid]
except KeyError: except KeyError:
reqids2txids[reqid] = TooFastEdit(reqid) reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send( await ems_order_stream.send(
@ -152,10 +152,11 @@ async def handle_order_requests(
) )
) )
else:
extra = { extra = {
'orderid': txid, # txid 'orderid': txid, # txid
} # 'newuserref': reqid,
}
else: else:
ep = 'addOrder' ep = 'addOrder'
@ -189,9 +190,16 @@ async def handle_order_requests(
'event': ep, 'event': ep,
'token': token, 'token': token,
# XXX: Lol, you can only send one of these.. 'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us
# a request dialog and an order's state-liftime are
# treated the same. Also this used to not work, the
# values used to be mutex for some odd reason until
# we dealt with support about it, and then they
# fixed it and pretended like we were crazy and the
# issue was never there lmao... coorps bro.
# 'userref': str(reqid),
'userref': str(reqid), 'userref': str(reqid),
# 'reqid': reqid, # remapped-to-int uid from ems
'pair': pair, 'pair': pair,
'price': str(order.price), 'price': str(order.price),
@ -633,6 +641,8 @@ async def handle_order_updates(
# sent in the order request, so we have to look it # sent in the order request, so we have to look it
# up from our own registry... # up from our own registry...
reqid = reqids2txids.inverse[txid] reqid = reqids2txids.inverse[txid]
if not reqid:
log.warning(f'Unknown trade dialog: {txid}')
action = trade['type'] action = trade['type']
price = float(trade['price']) price = float(trade['price'])
@ -713,6 +723,9 @@ async def handle_order_updates(
'userref': reqid, # XXX: always zero bug XD 'userref': reqid, # XXX: always zero bug XD
# **rest, # **rest,
}: }:
# TODO:
# - put the edit order status update code here.
# - send open order status msg.
log.info( log.info(
f'Order {txid}@reqid={reqid} was replaced' f'Order {txid}@reqid={reqid} was replaced'
) )
@ -771,28 +784,52 @@ async def handle_order_updates(
else: else:
vlm = rest.get('vol_exec', 0) vlm = rest.get('vol_exec', 0)
# XXX: keep kraken engine's ``txid`` synced
# with the ems dialog's ``reqid``.
ourreqid = reqids2txids.inverse.get(txid) ourreqid = reqids2txids.inverse.get(txid)
if reqid > 0:
if ourreqid is None:
log.info(
'Mapping new txid to our reqid:\n'
f'{reqid} -> {txid}'
)
reqids2txids[reqid] = txid
# XXX: abs necessary in order to enable else:
# mapping status response messages to the # NOTE: if is to hack around edit order not
# reqid-dialog.. # realying userref field
reqids2txids[reqid] = txid reqid = ourreqid
if ourreqid != reqid:
log.warning(
'REQID MISMATCH due to design mess..\n'
f'msg:{reqid}, ours:{ourreqid}'
)
# reqid = ourreqid
oid = ids.inverse.get(reqid) oid = ids.inverse.get(reqid)
if ( if (
status == 'open' status == 'open'
and ( and (
# XXX: too fast edit handled by the
# request handler task: this
# scenario occurs when ems side
# requests are coming in too quickly
# such that there is no known txid
# yet established for the ems
# dialog's last reqid when the
# request handler task is already
# receceiving a new update for that
# reqid. In this case we simply mark
# the reqid as being "too fast" and
# then when we get the next txid
# update from kraken's backend, and
# thus the new txid, we simply
# cancel the order for now.
# TOO fast edit handled by the # TODO: Ideally we eventually
# request handler task. # instead make the client side of
# the ems block until a submission
# is confirmed by the backend
# instead of this hacky throttle
# style approach and avoid requests
# coming in too quickly on the other
# side of the ems, aka the client
# <-> ems dialog.
(toofast := isinstance( (toofast := isinstance(
reqids2txids.get(reqid), reqids2txids.get(reqid),
TooFastEdit TooFastEdit
@ -869,6 +906,7 @@ async def handle_order_updates(
# there is no `status` field # there is no `status` field
case { case {
'vol_exec': vlm, 'vol_exec': vlm,
'userref': reqid,
**rest, **rest,
}: }:
# eg. fill msg contents (in total): # eg. fill msg contents (in total):
@ -880,7 +918,8 @@ async def handle_order_updates(
# 'userref': 0, # 'userref': 0,
# } # }
# TODO: emit fill msg from here # TODO: emit fill msg from here
reqid = reqids2txids.inverse[txid] ourreqid = reqids2txids.inverse[txid]
assert reqid == ourreqid
log.info( log.info(
f'openOrders vlm={vlm} Fill for {reqid}:\n' f'openOrders vlm={vlm} Fill for {reqid}:\n'
f'{update_msg}' f'{update_msg}'
@ -899,19 +938,21 @@ async def handle_order_updates(
# need them because that sub seems to have a bug where the # need them because that sub seems to have a bug where the
# `userref` field is always 0 instead of our generated reqid # `userref` field is always 0 instead of our generated reqid
# value... # value...
# Not sure why kraken devs decided to repeat themselves but # SOLVED: pass both a reqid and a userref in the init
# it almost seems as though we could drop this entire sub # request msg.
# and get everything we need by just parsing msgs correctly
# above? The only reason for this seems to be remapping # NOTE: The only reason for this seems to be remapping
# underlying `txid` values on order "edits" which the # underlying `txid` values on order "edits" which the
# `openOrders` sub doesn't seem to have any knowledge of. # `openOrders` sub doesn't seem to have any knowledge of.
# I'd also like to ask them which event guarantees that the # I'd also like to ask them which event guarantees that the
# the live order is now in the book, since these status ones # the live order is now in the book, since these status ones
# almost seem more like request-acks then state guarantees. # almost seem more like request-acks then state guarantees.
# ANSWER the `openOrders` is more indicative of "liveness".
case { case {
'event': etype, 'event': etype,
'status': status, 'status': status,
# 'reqid': reqid, 'reqid': reqid,
**rest, **rest,
} as event if ( } as event if (
etype in { etype in {
@ -926,8 +967,8 @@ async def handle_order_updates(
) )
txid = rest.get('txid') txid = rest.get('txid')
reqid = reqids2txids.inverse.get(txid)
lasttxid = reqids2txids.get(reqid) lasttxid = reqids2txids.get(reqid)
print(f'txids: {(txid, lasttxid)}')
# TODO: relay these to EMS once it supports # TODO: relay these to EMS once it supports
# open order loading. # open order loading.
@ -939,23 +980,23 @@ async def handle_order_updates(
) )
continue continue
if reqid is not None: # if reqid is not None:
# update the msg chain # update the msg chain
chain = apiflows[reqid] chain = apiflows[reqid]
chain.maps.append(event) chain.maps.append(event)
resps, errored = process_status( resps, errored = process_status(
event, event,
oid, oid,
token, token,
chain, chain,
reqid=reqid, reqids2txids,
) )
if resps: if resps:
for resp in resps: for resp in resps:
await ems_stream.send(resp) await ems_stream.send(resp)
if txid: if txid or lasttxid:
if ( if (
isinstance(lasttxid, TooFastEdit) isinstance(lasttxid, TooFastEdit)
or errored or errored
@ -969,14 +1010,6 @@ async def handle_order_updates(
'reqid': reqid or 0, 'reqid': reqid or 0,
'txid': [txid], 'txid': [txid],
}) })
# else:
# XXX: we **must** do this mapping for edit order
# status updates since the `openOrders` sub above
# never relays back the correct client-side `reqid`
# that is put in the order request..
# reqids2txids[reqid] = txid
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')
@ -986,7 +1019,7 @@ def process_status(
oid: str, oid: str,
token: str, token: str,
chain: ChainMap, chain: ChainMap,
reqid: Optional[int] = None, reqids2txids: dict[int, str],
) -> tuple[list[MsgUnion], bool]: ) -> tuple[list[MsgUnion], bool]:
''' '''
@ -998,7 +1031,7 @@ def process_status(
case { case {
'event': etype, 'event': etype,
'status': 'error', 'status': 'error',
# 'reqid': reqid, 'reqid': reqid,
'errorMessage': errmsg, 'errorMessage': errmsg,
}: }:
# any of ``{'add', 'edit', 'cancel'}`` # any of ``{'add', 'edit', 'cancel'}``
@ -1022,7 +1055,7 @@ def process_status(
case { case {
'event': 'addOrderStatus', 'event': 'addOrderStatus',
'status': "ok", 'status': "ok",
# 'reqid': reqid, # oid from ems side 'reqid': reqid, # oid from ems side
'txid': txid, 'txid': txid,
'descr': descr, # only on success? 'descr': descr, # only on success?
}: }:
@ -1037,7 +1070,7 @@ def process_status(
case { case {
'event': 'editOrderStatus', 'event': 'editOrderStatus',
'status': "ok", 'status': "ok",
# 'reqid': reqid, # oid from ems side 'reqid': reqid, # oid from ems side
'descr': descr, 'descr': descr,
# NOTE: for edit request this is a new value # NOTE: for edit request this is a new value
@ -1050,13 +1083,19 @@ def process_status(
f'txid: {origtxid} -> {txid}\n' f'txid: {origtxid} -> {txid}\n'
f'{descr}' f'{descr}'
) )
# XXX: update the expected txid since the ``openOrders`` sub
# doesn't relay through the ``userref`` value..
# (hopefully kraken will fix this so we don't need this
# line.)
reqids2txids[reqid] = txid
# deliver another ack to update the ems-side `.reqid`. # deliver another ack to update the ems-side `.reqid`.
return [], False return [], False
case { case {
"event": "cancelOrderStatus", "event": "cancelOrderStatus",
"status": "ok", "status": "ok",
# 'reqid': reqid, 'reqid': reqid,
# XXX: sometimes this isn't provided!? # XXX: sometimes this isn't provided!?
# 'txid': txids, # 'txid': txids,
@ -1067,6 +1106,9 @@ def process_status(
f'Cancelling order {oid}[requid={reqid}]:\n' f'Cancelling order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n' f'brokerd reqid: {reqid}\n'
) )
if txid == reqids2txids[reqid]:
reqids2txids.pop(reqid)
return [], False return [], False