Use the ``newuserref`` field on order edits
Why we need so many fields to accomplish passing through a dialog key to orders is beyond me but this is how they do it with edits.. Allows not having to handle `editOrderStatus` msgs to update the dialog key table and instead just do it in the `openOrders` sub by checking the canceled msg for a 'cancel_reason' of 'Order replaced', in which case we just pop the txid and wait for the new order the kraken backend engine will submit automatically, which will now have the correct 'userref' value we passed in via the `newuserref`, and then we add that new `txid` to our table.kraken_userref_hackzin
parent
227a80469e
commit
1cbf45b4c4
|
@ -138,9 +138,13 @@ async def handle_order_requests(
|
||||||
ep = 'editOrder'
|
ep = 'editOrder'
|
||||||
reqid = ids[order.oid] # integer not txid
|
reqid = ids[order.oid] # integer not txid
|
||||||
try:
|
try:
|
||||||
# txid = reqids2txids.pop(reqid)
|
|
||||||
txid = reqids2txids[reqid]
|
txid = reqids2txids[reqid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
assert 0
|
||||||
|
|
||||||
|
# XXX: not sure if this block ever gets hit now?
|
||||||
|
log.error('TOO FAST EDIT')
|
||||||
|
|
||||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
reqids2txids[reqid] = TooFastEdit(reqid)
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
|
@ -155,7 +159,7 @@ async def handle_order_requests(
|
||||||
else:
|
else:
|
||||||
extra = {
|
extra = {
|
||||||
'orderid': txid, # txid
|
'orderid': txid, # txid
|
||||||
# 'newuserref': reqid,
|
'newuserref': str(reqid),
|
||||||
}
|
}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -200,15 +204,12 @@ async def handle_order_requests(
|
||||||
# issue was never there lmao... coorps bro.
|
# issue was never there lmao... coorps bro.
|
||||||
# 'userref': str(reqid),
|
# 'userref': str(reqid),
|
||||||
'userref': str(reqid),
|
'userref': str(reqid),
|
||||||
|
|
||||||
'pair': pair,
|
'pair': pair,
|
||||||
'price': str(order.price),
|
'price': str(order.price),
|
||||||
'volume': str(order.size),
|
'volume': str(order.size),
|
||||||
|
# validate: 'true', # validity check, nothing more
|
||||||
# only ensures request is valid, nothing more
|
|
||||||
# validate: 'true',
|
|
||||||
|
|
||||||
} | extra
|
} | extra
|
||||||
|
|
||||||
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
||||||
await ws.send_msg(req)
|
await ws.send_msg(req)
|
||||||
|
|
||||||
|
@ -229,9 +230,7 @@ async def handle_order_requests(
|
||||||
symbol=msg['symbol'],
|
symbol=msg['symbol'],
|
||||||
reason=(
|
reason=(
|
||||||
'Invalid request msg:\n{msg}'
|
'Invalid request msg:\n{msg}'
|
||||||
),
|
))
|
||||||
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -253,7 +252,6 @@ async def subscribe(
|
||||||
'''
|
'''
|
||||||
# more specific logic for this in kraken's sync client:
|
# more specific logic for this in kraken's sync client:
|
||||||
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
||||||
|
|
||||||
assert token
|
assert token
|
||||||
for sub in subs:
|
for sub in subs:
|
||||||
msg = {
|
msg = {
|
||||||
|
@ -499,16 +497,9 @@ async def trades_dialogue(
|
||||||
)
|
)
|
||||||
await ctx.started((ppmsgs, [acc_name]))
|
await ctx.started((ppmsgs, [acc_name]))
|
||||||
|
|
||||||
# XXX: not fucking clue but putting this finally block
|
|
||||||
# will suppress errors inside the direct await below!?!
|
|
||||||
# likely something to do with the exist stack inside
|
|
||||||
# the nobsws stuff...
|
|
||||||
# try:
|
|
||||||
|
|
||||||
# Get websocket token for authenticated data stream
|
# Get websocket token for authenticated data stream
|
||||||
# Assert that a token was actually received.
|
# Assert that a token was actually received.
|
||||||
resp = await client.endpoint('GetWebSocketsToken', {})
|
resp = await client.endpoint('GetWebSocketsToken', {})
|
||||||
|
|
||||||
err = resp.get('error')
|
err = resp.get('error')
|
||||||
if err:
|
if err:
|
||||||
raise BrokerError(err)
|
raise BrokerError(err)
|
||||||
|
@ -582,32 +573,22 @@ async def handle_order_updates(
|
||||||
defined in the signature clear to the reader.
|
defined in the signature clear to the reader.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
async for msg in ws_stream:
|
async for msg in ws_stream:
|
||||||
match msg:
|
match msg:
|
||||||
# process and relay clearing trade events to ems
|
|
||||||
# https://docs.kraken.com/websockets/#message-ownTrades
|
|
||||||
# TODO: turns out you get the fill events from the
|
# TODO: turns out you get the fill events from the
|
||||||
# `openOrders` before you get this, so it might be better
|
# `openOrders` before you get this, so it might be better
|
||||||
# to do all fill/status/pp updates in that sub and just use
|
# to do all fill/status/pp updates in that sub and just use
|
||||||
# this one for ledger syncs?
|
# this one for ledger syncs?
|
||||||
|
|
||||||
|
# XXX: ASK SUPPORT ABOUT THIS!
|
||||||
|
|
||||||
# For eg. we could take the "last 50 trades" and do a diff
|
# For eg. we could take the "last 50 trades" and do a diff
|
||||||
# with the ledger and then only do a re-sync if something
|
# with the ledger and then only do a re-sync if something
|
||||||
# seems amiss?
|
# seems amiss?
|
||||||
case [
|
|
||||||
trades_msgs,
|
|
||||||
'ownTrades',
|
|
||||||
{'sequence': seq},
|
|
||||||
]:
|
|
||||||
log.info(
|
|
||||||
f'ownTrades update_{seq}:\n'
|
|
||||||
f'{pformat(trades_msgs)}'
|
|
||||||
)
|
|
||||||
# XXX: a fix / todo
|
|
||||||
# see the comment in the caller about weird error
|
|
||||||
# suppression around a commented `try:`
|
|
||||||
# assert 0
|
|
||||||
|
|
||||||
|
# process and relay clearing trade events to ems
|
||||||
|
# https://docs.kraken.com/websockets/#message-ownTrades
|
||||||
# format as tid -> trade event map
|
# format as tid -> trade event map
|
||||||
# eg. received msg format,
|
# eg. received msg format,
|
||||||
# [{'TOKWHY-SMTUB-G5DOI6': {
|
# [{'TOKWHY-SMTUB-G5DOI6': {
|
||||||
|
@ -623,6 +604,15 @@ async def handle_order_updates(
|
||||||
# 'type': 'buy',
|
# 'type': 'buy',
|
||||||
# 'vol': '0.00448042'
|
# 'vol': '0.00448042'
|
||||||
# }}]
|
# }}]
|
||||||
|
case [
|
||||||
|
trades_msgs,
|
||||||
|
'ownTrades',
|
||||||
|
{'sequence': seq},
|
||||||
|
]:
|
||||||
|
log.info(
|
||||||
|
f'ownTrades update_{seq}:\n'
|
||||||
|
f'{pformat(trades_msgs)}'
|
||||||
|
)
|
||||||
trades = {
|
trades = {
|
||||||
tid: trade
|
tid: trade
|
||||||
for entry in trades_msgs
|
for entry in trades_msgs
|
||||||
|
@ -665,7 +655,7 @@ async def handle_order_updates(
|
||||||
)
|
)
|
||||||
await ems_stream.send(fill_msg)
|
await ems_stream.send(fill_msg)
|
||||||
|
|
||||||
filled_msg = BrokerdStatus(
|
status_msg = BrokerdStatus(
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
|
|
||||||
|
@ -687,7 +677,7 @@ async def handle_order_updates(
|
||||||
# https://github.com/pikers/piker/issues/296
|
# https://github.com/pikers/piker/issues/296
|
||||||
remaining=0,
|
remaining=0,
|
||||||
)
|
)
|
||||||
await ems_stream.send(filled_msg)
|
await ems_stream.send(status_msg)
|
||||||
|
|
||||||
new_trans = norm_trade_records(trades)
|
new_trans = norm_trade_records(trades)
|
||||||
ppmsgs = trades2pps(
|
ppmsgs = trades2pps(
|
||||||
|
@ -715,33 +705,8 @@ async def handle_order_updates(
|
||||||
txid, update_msg = list(order_msg.items())[0]
|
txid, update_msg = list(order_msg.items())[0]
|
||||||
match update_msg:
|
match update_msg:
|
||||||
|
|
||||||
# we ignore internal order updates triggered by
|
# XXX: eg. of full msg schema:
|
||||||
# kraken's "edit" endpoint.
|
# {'avg_price': _,
|
||||||
case {
|
|
||||||
'cancel_reason': 'Order replaced',
|
|
||||||
'status': status,
|
|
||||||
'userref': reqid, # XXX: always zero bug XD
|
|
||||||
# **rest,
|
|
||||||
}:
|
|
||||||
# TODO:
|
|
||||||
# - put the edit order status update code here.
|
|
||||||
# - send open order status msg.
|
|
||||||
log.info(
|
|
||||||
f'Order {txid}@reqid={reqid} was replaced'
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
case {
|
|
||||||
'userref': reqid,
|
|
||||||
|
|
||||||
# during a fill this field is **not**
|
|
||||||
# provided! but, it is always avail on
|
|
||||||
# actual status updates.. see case above.
|
|
||||||
'status': status,
|
|
||||||
**rest,
|
|
||||||
|
|
||||||
# XXX: eg. of remaining msg schema:
|
|
||||||
# 'avg_price': _,
|
|
||||||
# 'cost': _,
|
# 'cost': _,
|
||||||
# 'descr': {
|
# 'descr': {
|
||||||
# 'close': None,
|
# 'close': None,
|
||||||
|
@ -764,17 +729,16 @@ async def handle_order_updates(
|
||||||
# 'stopprice': '0.00000000',
|
# 'stopprice': '0.00000000',
|
||||||
# 'timeinforce': 'GTC',
|
# 'timeinforce': 'GTC',
|
||||||
# 'vol': submit_vlm, # '13.34400854',
|
# 'vol': submit_vlm, # '13.34400854',
|
||||||
# 'vol_exec': exec_vlm, # 0.0000
|
# 'vol_exec': exec_vlm} # 0.0000
|
||||||
}:
|
case {
|
||||||
ems_status = {
|
'userref': reqid,
|
||||||
'open': 'submitted',
|
|
||||||
'closed': 'filled',
|
|
||||||
'canceled': 'cancelled',
|
|
||||||
# do we even need to forward
|
|
||||||
# this state to the ems?
|
|
||||||
'pending': 'pending',
|
|
||||||
}[status]
|
|
||||||
|
|
||||||
|
# during a fill this field is **not**
|
||||||
|
# provided! but, it is always avail on
|
||||||
|
# actual status updates.. see case above.
|
||||||
|
'status': status,
|
||||||
|
**rest,
|
||||||
|
}:
|
||||||
# TODO: store this in a ChainMap instance
|
# TODO: store this in a ChainMap instance
|
||||||
# per order dialog.
|
# per order dialog.
|
||||||
# submit_vlm = rest.get('vol', 0)
|
# submit_vlm = rest.get('vol', 0)
|
||||||
|
@ -784,21 +748,39 @@ async def handle_order_updates(
|
||||||
else:
|
else:
|
||||||
vlm = rest.get('vol_exec', 0)
|
vlm = rest.get('vol_exec', 0)
|
||||||
|
|
||||||
|
if status == 'canceled':
|
||||||
|
reqids2txids.pop(reqid)
|
||||||
|
|
||||||
|
# we specially ignore internal order
|
||||||
|
# updates triggered by kraken's "edit"
|
||||||
|
# endpoint.
|
||||||
|
if rest['cancel_reason'] == 'Order replaced':
|
||||||
|
# TODO:
|
||||||
|
# - put the edit order status update
|
||||||
|
# code here?
|
||||||
|
# - send open order status msg.
|
||||||
|
log.info(
|
||||||
|
f'Order replaced: {txid}@reqid={reqid}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# we don't do normal msg emission on
|
||||||
|
# a replacement cancel since it's
|
||||||
|
# the result of an "edited order"
|
||||||
|
# and thus we mask the kraken
|
||||||
|
# backend cancel then create details
|
||||||
|
# from the ems side.
|
||||||
|
continue
|
||||||
|
else:
|
||||||
# XXX: keep kraken engine's ``txid`` synced
|
# XXX: keep kraken engine's ``txid`` synced
|
||||||
# with the ems dialog's ``reqid``.
|
# with the ems dialog's ``reqid``.
|
||||||
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
ourreqid = reqids2txids.inverse.get(txid)
|
ourreqid = reqids2txids.inverse.get(txid)
|
||||||
if reqid > 0:
|
|
||||||
if ourreqid is None:
|
if ourreqid is None:
|
||||||
log.info(
|
log.info(
|
||||||
'Mapping new txid to our reqid:\n'
|
'Mapping new txid to our reqid:\n'
|
||||||
f'{reqid} -> {txid}'
|
f'{reqid} -> {txid}'
|
||||||
)
|
)
|
||||||
reqids2txids[reqid] = txid
|
|
||||||
|
|
||||||
else:
|
|
||||||
# NOTE: if is to hack around edit order not
|
|
||||||
# realying userref field
|
|
||||||
reqid = ourreqid
|
|
||||||
|
|
||||||
oid = ids.inverse.get(reqid)
|
oid = ids.inverse.get(reqid)
|
||||||
|
|
||||||
|
@ -845,7 +827,7 @@ async def handle_order_updates(
|
||||||
# by not moving the client side line
|
# by not moving the client side line
|
||||||
# until an edit confirmation
|
# until an edit confirmation
|
||||||
# arrives...
|
# arrives...
|
||||||
log.warning(
|
log.cancel(
|
||||||
f'Received too fast edit {txid}:\n'
|
f'Received too fast edit {txid}:\n'
|
||||||
f'{update_msg}\n'
|
f'{update_msg}\n'
|
||||||
'Cancelling order for now!..'
|
'Cancelling order for now!..'
|
||||||
|
@ -855,7 +837,7 @@ async def handle_order_updates(
|
||||||
# TODO: handle these and relay them
|
# TODO: handle these and relay them
|
||||||
# through the EMS to the client / UI
|
# through the EMS to the client / UI
|
||||||
# side!
|
# side!
|
||||||
log.warning(
|
log.cancel(
|
||||||
f'Rx unknown active order {txid}:\n'
|
f'Rx unknown active order {txid}:\n'
|
||||||
f'{update_msg}\n'
|
f'{update_msg}\n'
|
||||||
'Cancelling order for now!..'
|
'Cancelling order for now!..'
|
||||||
|
@ -871,6 +853,18 @@ async def handle_order_updates(
|
||||||
})
|
})
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# remap statuses to ems set.
|
||||||
|
ems_status = {
|
||||||
|
'open': 'submitted',
|
||||||
|
'closed': 'filled',
|
||||||
|
'canceled': 'cancelled',
|
||||||
|
# do we even need to forward
|
||||||
|
# this state to the ems?
|
||||||
|
'pending': 'pending',
|
||||||
|
}[status]
|
||||||
|
# TODO: i like the open / closed semantics
|
||||||
|
# more we should consider them for internals
|
||||||
|
|
||||||
# send BrokerdStatus messages for all
|
# send BrokerdStatus messages for all
|
||||||
# order state updates
|
# order state updates
|
||||||
resp = BrokerdStatus(
|
resp = BrokerdStatus(
|
||||||
|
@ -902,14 +896,8 @@ async def handle_order_updates(
|
||||||
apiflows[reqid].maps.append(update_msg)
|
apiflows[reqid].maps.append(update_msg)
|
||||||
await ems_stream.send(resp)
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
# fill event.
|
# fill msg.
|
||||||
# there is no `status` field
|
# eg. contents (in total):
|
||||||
case {
|
|
||||||
'vol_exec': vlm,
|
|
||||||
'userref': reqid,
|
|
||||||
**rest,
|
|
||||||
}:
|
|
||||||
# eg. fill msg contents (in total):
|
|
||||||
# {
|
# {
|
||||||
# 'vol_exec': '0.84709869',
|
# 'vol_exec': '0.84709869',
|
||||||
# 'cost': '101.25370642',
|
# 'cost': '101.25370642',
|
||||||
|
@ -917,7 +905,13 @@ async def handle_order_updates(
|
||||||
# 'avg_price': '119.53000001',
|
# 'avg_price': '119.53000001',
|
||||||
# 'userref': 0,
|
# 'userref': 0,
|
||||||
# }
|
# }
|
||||||
# TODO: emit fill msg from here
|
# NOTE: there is no `status` field
|
||||||
|
case {
|
||||||
|
'vol_exec': vlm,
|
||||||
|
'userref': reqid,
|
||||||
|
**rest,
|
||||||
|
}:
|
||||||
|
# TODO: emit fill msg from here?
|
||||||
ourreqid = reqids2txids.inverse[txid]
|
ourreqid = reqids2txids.inverse[txid]
|
||||||
assert reqid == ourreqid
|
assert reqid == ourreqid
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -932,18 +926,12 @@ async def handle_order_updates(
|
||||||
f'{txid}:{order_msg}'
|
f'{txid}:{order_msg}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: given the 'openOrders' sub , pretty
|
|
||||||
# much all the msgs we get for this sub are duplicate
|
|
||||||
# of the (incremental) updates in that one though we still
|
|
||||||
# need them because that sub seems to have a bug where the
|
|
||||||
# `userref` field is always 0 instead of our generated reqid
|
|
||||||
# value...
|
|
||||||
# SOLVED: pass both a reqid and a userref in the init
|
|
||||||
# request msg.
|
|
||||||
|
|
||||||
# NOTE: The only reason for this seems to be remapping
|
# NOTE: The only reason for this seems to be remapping
|
||||||
# underlying `txid` values on order "edits" which the
|
# underlying `txid` values on order "edits" which the
|
||||||
# `openOrders` sub doesn't seem to have any knowledge of.
|
# `openOrders` sub doesn't seem to have any knowledge of.
|
||||||
|
# UPDATE: seems like we don't need this any more thanks to
|
||||||
|
# passing through the dialog key / reqid in the `newuserref`
|
||||||
|
# field on edit requests.
|
||||||
|
|
||||||
# I'd also like to ask them which event guarantees that the
|
# I'd also like to ask them which event guarantees that the
|
||||||
# the live order is now in the book, since these status ones
|
# the live order is now in the book, since these status ones
|
||||||
|
@ -968,7 +956,6 @@ async def handle_order_updates(
|
||||||
|
|
||||||
txid = rest.get('txid')
|
txid = rest.get('txid')
|
||||||
lasttxid = reqids2txids.get(reqid)
|
lasttxid = reqids2txids.get(reqid)
|
||||||
print(f'txids: {(txid, lasttxid)}')
|
|
||||||
|
|
||||||
# TODO: relay these to EMS once it supports
|
# TODO: relay these to EMS once it supports
|
||||||
# open order loading.
|
# open order loading.
|
||||||
|
@ -980,7 +967,6 @@ async def handle_order_updates(
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# if reqid is not None:
|
|
||||||
# update the msg chain
|
# update the msg chain
|
||||||
chain = apiflows[reqid]
|
chain = apiflows[reqid]
|
||||||
chain.maps.append(event)
|
chain.maps.append(event)
|
||||||
|
@ -992,18 +978,24 @@ async def handle_order_updates(
|
||||||
chain,
|
chain,
|
||||||
reqids2txids,
|
reqids2txids,
|
||||||
)
|
)
|
||||||
|
|
||||||
if resps:
|
if resps:
|
||||||
for resp in resps:
|
for resp in resps:
|
||||||
await ems_stream.send(resp)
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
if txid or lasttxid:
|
txid = txid or lasttxid
|
||||||
if (
|
if (
|
||||||
isinstance(lasttxid, TooFastEdit)
|
# errored likely on a rate limit or bad input
|
||||||
or errored
|
errored
|
||||||
|
and txid
|
||||||
|
|
||||||
|
# we throttle too-fast-requests on the ems side
|
||||||
|
or (txid and isinstance(txid, TooFastEdit))
|
||||||
):
|
):
|
||||||
# client was editting too quickly
|
# client was editting too quickly
|
||||||
# so we instead cancel this order
|
# so we instead cancel this order
|
||||||
log.cancel(f'Cancelling order for {reqid}@{txid}')
|
log.cancel(
|
||||||
|
f'Cancelling {reqid}@{txid} due to error:\n {event}')
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': token,
|
'token': token,
|
||||||
|
@ -1088,7 +1080,7 @@ def process_status(
|
||||||
# doesn't relay through the ``userref`` value..
|
# doesn't relay through the ``userref`` value..
|
||||||
# (hopefully kraken will fix this so we don't need this
|
# (hopefully kraken will fix this so we don't need this
|
||||||
# line.)
|
# line.)
|
||||||
reqids2txids[reqid] = txid
|
# reqids2txids[reqid] = txid
|
||||||
# deliver another ack to update the ems-side `.reqid`.
|
# deliver another ack to update the ems-side `.reqid`.
|
||||||
return [], False
|
return [], False
|
||||||
|
|
||||||
|
@ -1106,8 +1098,8 @@ def process_status(
|
||||||
f'Cancelling order {oid}[requid={reqid}]:\n'
|
f'Cancelling order {oid}[requid={reqid}]:\n'
|
||||||
f'brokerd reqid: {reqid}\n'
|
f'brokerd reqid: {reqid}\n'
|
||||||
)
|
)
|
||||||
if txid == reqids2txids[reqid]:
|
# if txid == reqids2txids[reqid]:
|
||||||
reqids2txids.pop(reqid)
|
# reqids2txids.pop(reqid)
|
||||||
|
|
||||||
return [], False
|
return [], False
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue