Merge pull request #368 from pikers/kraken_userref_hackzin

`kraken`: use `userref` field AND `reqid`, utilize `openOrders` sub for most msging
kraken_ws_orders
goodboy 2022-08-03 09:11:42 -04:00 committed by GitHub
commit ef5829a6b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 311 additions and 309 deletions

View File

@ -104,7 +104,7 @@ async def handle_order_requests(
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder
counter = count()
counter = count(1)
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
@ -114,16 +114,32 @@ async def handle_order_requests(
}:
cancel = BrokerdCancel(**msg)
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
try:
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST CANCEL/EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, could not cancelling..'
),
)
)
else:
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case {
'account': 'kraken.spot' as account,
@ -138,8 +154,10 @@ async def handle_order_requests(
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
try:
txid = reqids2txids.pop(reqid)
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
@ -151,10 +169,11 @@ async def handle_order_requests(
)
)
extra = {
'orderid': txid, # txid
}
else:
extra = {
'orderid': txid, # txid
'newuserref': str(reqid),
}
else:
ep = 'addOrder'
@ -188,18 +207,22 @@ async def handle_order_requests(
'event': ep,
'token': token,
# XXX: this seems to always get an error response?
# 'userref': f"'{reqid}'",
'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us
# a request dialog and an order's state-liftime are
# treated the same. Also this used to not work, the
# values used to be mutex for some odd reason until
# we dealt with support about it, and then they
# fixed it and pretended like we were crazy and the
# issue was never there lmao... coorps bro.
# 'userref': str(reqid),
'userref': str(reqid),
'pair': pair,
'price': str(order.price),
'volume': str(order.size),
# only ensures request is valid, nothing more
# validate: 'true',
# validate: 'true', # validity check, nothing more
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
@ -220,9 +243,7 @@ async def handle_order_requests(
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
),
)
))
)
@ -230,9 +251,17 @@ async def handle_order_requests(
async def subscribe(
ws: wsproto.WSConnection,
token: str,
subs: list[str] = [
'ownTrades',
'openOrders',
subs: list[tuple[str, dict]] = [
('ownTrades', {
# don't send first 50 trades on startup,
# we already pull this manually from the rest endpoint.
'snapshot': False,
},),
('openOrders', {
# include rate limit counters
'ratecounter': True,
},),
],
):
'''
@ -244,14 +273,16 @@ async def subscribe(
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token
for sub in subs:
subnames: set[str] = set()
for name, sub_opts in subs:
msg = {
'event': 'subscribe',
'subscription': {
'name': sub,
'name': name,
'token': token,
**sub_opts,
}
}
@ -260,7 +291,34 @@ async def subscribe(
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(msg)
subnames.add(name)
# wait on subscriptionn acks
with trio.move_on_after(5):
while True:
match (msg := await ws.recv_msg()):
case {
'event': 'subscriptionStatus',
'status': 'subscribed',
'subscription': sub_opts,
} as msg:
log.info(
f'Sucessful subscribe for {sub_opts}:\n'
f'{pformat(msg)}'
)
subnames.remove(sub_opts['name'])
if not subnames:
break
case {
'event': 'subscriptionStatus',
'status': 'error',
'errorMessage': errmsg,
} as msg:
raise RuntimeError(
f'{errmsg}\n\n'
f'{pformat(msg)}'
)
yield
for sub in subs:
@ -490,16 +548,9 @@ async def trades_dialogue(
)
await ctx.started((ppmsgs, [acc_name]))
# XXX: not fucking clue but putting this finally block
# will suppress errors inside the direct await below!?!
# likely something to do with the exist stack inside
# the nobsws stuff...
# try:
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error')
if err:
raise BrokerError(err)
@ -573,18 +624,37 @@ async def handle_order_updates(
defined in the signature clear to the reader.
'''
async for msg in ws_stream:
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
# TODO: turns out you get the fill events from the
# `openOrders` before you get this, so it might be better
# to do all fill/status/pp updates in that sub and just use
# this one for ledger syncs?
# XXX: ASK SUPPORT ABOUT THIS!
# For eg. we could take the "last 50 trades" and do a diff
# with the ledger and then only do a re-sync if something
# seems amiss?
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
# format as tid -> trade event map
# eg. received msg format,
# [{'TOKWHY-SMTUB-G5DOI6': {
# 'cost': '95.29047',
# 'fee': '0.24776',
# 'margin': '0.00000',
# 'ordertxid': 'OKSUXM-3OLSB-L7TN72',
# 'ordertype': 'limit',
# 'pair': 'XBT/EUR',
# 'postxid': 'TKH2SE-M7IF5-CFI7LT',
# 'price': '21268.20000',
# 'time': '1657990947.640891',
# 'type': 'buy',
# 'vol': '0.00448042'
# }}]
case [
trades_msgs,
'ownTrades',
@ -594,63 +664,36 @@ async def handle_order_updates(
f'ownTrades update_{seq}:\n'
f'{pformat(trades_msgs)}'
)
# XXX: a fix / todo
# see the comment in the caller about weird error
# suppression around a commented `try:`
# assert 0
# format as tid -> trade event map
# eg. received msg format,
# [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047',
# 'fee': '0.24776',
# 'margin': '0.00000',
# 'ordertxid': 'OKSUXM-3OLSB-L7TN72',
# 'ordertype': 'limit',
# 'pair': 'XBT/EUR',
# 'postxid': 'TKH2SE-M7IF5-CFI7LT',
# 'price': '21268.20000',
# 'time': '1657990947.640891',
# 'type': 'buy',
# 'vol': '0.00448042'}}]
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# don't re-process datums we've already seen
if tid not in ledger_trans
# if tid not in ledger_trans
}
for tid, trade in trades.items():
assert tid not in ledger_trans
txid = trade['ordertxid']
reqid = trade.get('userref')
# NOTE: yet again, here we don't have any ref to the
# reqid that's generated by us (as the client) and
# sent in the order request, so we have to look it
# up from our own registry...
reqid = reqids2txids.inverse[txid]
if not reqid:
# NOTE: yet again, here we don't have any ref to the
# reqid that's generated by us (as the client) and
# sent in the order request, so we have to look it
# up from our own registry...
reqid = reqids2txids.inverse[txid]
if not reqid:
log.warning(f'Unknown trade dialog: {txid}')
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# send a fill msg for gui update
fill_msg = BrokerdFill(
time_ns=time.time_ns(),
reqid=reqid,
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'} | trade,
broker_time=broker_time
)
await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus(
# TODO: we can emit this on the "closed" state in
# the `openOrders` sub-block below.
status_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
@ -672,7 +715,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg)
await ems_stream.send(status_msg)
new_trans = norm_trade_records(trades)
ppmsgs = trades2pps(
@ -700,19 +743,31 @@ async def handle_order_updates(
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case {
'cancel_reason': 'Order replaced',
'status': status,
# 'userref': reqid, # XXX: always zero bug XD
# **rest,
}:
log.info(
f'Order {txid} was replaced'
)
continue
# XXX: eg. of full msg schema:
# {'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm} # 0.0000
case {
'userref': reqid,
@ -721,42 +776,7 @@ async def handle_order_updates(
# actual status updates.. see case above.
'status': status,
**rest,
# XXX: eg. of remaining msg schema:
# 'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000
}:
ems_status = {
'open': 'submitted',
'closed': 'filled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
# TODO: store this in a ChainMap instance
# per order dialog.
# submit_vlm = rest.get('vol', 0)
@ -766,23 +786,70 @@ async def handle_order_updates(
else:
vlm = rest.get('vol_exec', 0)
ourreqid = reqids2txids.inverse.get(txid)
if status == 'canceled':
reqids2txids.pop(reqid)
if ourreqid != reqid:
log.warning(
'REQID MISMATCH due to kraken api bugs..\n'
f'msg:{reqid}, ours:{ourreqid}'
)
reqid = ourreqid
# we specially ignore internal order
# updates triggered by kraken's "edit"
# endpoint.
if rest['cancel_reason'] == 'Order replaced':
# TODO:
# - put the edit order status update
# code here?
# - send open order status msg.
log.info(
f'Order replaced: {txid}@reqid={reqid}'
)
# we don't do normal msg emission on
# a replacement cancel since it's
# the result of an "edited order"
# and thus we mask the kraken
# backend cancel then create details
# from the ems side.
continue
else:
# XXX: keep kraken engine's ``txid`` synced
# with the ems dialog's ``reqid``.
reqids2txids[reqid] = txid
ourreqid = reqids2txids.inverse.get(txid)
if ourreqid is None:
log.info(
'Mapping new txid to our reqid:\n'
f'{reqid} -> {txid}'
)
oid = ids.inverse.get(reqid)
if (
status == 'open'
and (
# XXX: too fast edit handled by the
# request handler task: this
# scenario occurs when ems side
# requests are coming in too quickly
# such that there is no known txid
# yet established for the ems
# dialog's last reqid when the
# request handler task is already
# receceiving a new update for that
# reqid. In this case we simply mark
# the reqid as being "too fast" and
# then when we get the next txid
# update from kraken's backend, and
# thus the new txid, we simply
# cancel the order for now.
# TOO fast edit handled by the
# request handler task.
# TODO: Ideally we eventually
# instead make the client side of
# the ems block until a submission
# is confirmed by the backend
# instead of this hacky throttle
# style approach and avoid requests
# coming in too quickly on the other
# side of the ems, aka the client
# <-> ems dialog.
(toofast := isinstance(
reqids2txids.get(reqid),
TooFastEdit
@ -798,7 +865,7 @@ async def handle_order_updates(
# by not moving the client side line
# until an edit confirmation
# arrives...
log.warning(
log.cancel(
f'Received too fast edit {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
@ -808,7 +875,7 @@ async def handle_order_updates(
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
log.cancel(
f'Rx unknown active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
@ -824,6 +891,18 @@ async def handle_order_updates(
})
continue
# remap statuses to ems set.
ems_status = {
'open': 'submitted',
'closed': 'filled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
# TODO: i like the open / closed semantics
# more we should consider them for internals
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
@ -855,27 +934,45 @@ async def handle_order_updates(
apiflows[reqid].maps.append(update_msg)
await ems_stream.send(resp)
# fill event.
# there is no `status` field
# fill msg.
# eg. contents (in total):
# {
# 'vol_exec': '0.84709869',
# 'cost': '101.25370642',
# 'fee': '0.26325964',
# 'avg_price': '119.53000001',
# 'userref': 0,
# }
# NOTE: there is no `status` field
case {
'vol_exec': vlm,
'avg_price': price,
'userref': reqid,
**rest,
}:
# eg. fill msg contents (in total):
# {
# 'vol_exec': '0.84709869',
# 'cost': '101.25370642',
# 'fee': '0.26325964',
# 'avg_price': '119.53000001',
# 'userref': 0,
# }
# TODO: emit fill msg from here
reqid = reqids2txids.inverse[txid]
} as msg:
ourreqid = reqids2txids.inverse[txid]
assert reqid == ourreqid
log.info(
f'openOrders vlm={vlm} Fill for {reqid}:\n'
f'{update_msg}'
)
continue
fill_msg = BrokerdFill(
time_ns=time.time_ns(),
reqid=reqid,
# action=action, # just use size value
# for now?
size=vlm,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'} | trade,
broker_time=broker_time
)
await ems_stream.send(fill_msg)
case _:
log.warning(
@ -883,21 +980,7 @@ async def handle_order_updates(
f'{txid}:{order_msg}'
)
# TODO: given the 'openOrders' sub , pretty
# much all the msgs we get for this sub are duplicate
# of the (incremental) updates in that one though we still
# need them because that sub seems to have a bug where the
# `userref` field is always 0 instead of our generated reqid
# value...
# Not sure why kraken devs decided to repeat themselves but
# it almost seems as though we could drop this entire sub
# and get everything we need by just parsing msgs correctly
# above? The only reason for this seems to be remapping
# underlying `txid` values on order "edits" which the
# `openOrders` sub doesn't seem to have any knowledge of.
# I'd also like to ask them which event guarantees that the
# the live order is now in the book, since these status ones
# almost seem more like request-acks then state guarantees.
# order request status updates
case {
'event': etype,
'status': status,
@ -914,9 +997,13 @@ async def handle_order_updates(
f'{etype}:\n'
f'{pformat(msg)}'
)
oid = ids.inverse.get(reqid)
txid = rest.get('txid')
lasttxid = reqids2txids.get(reqid)
# TODO: relay these to EMS once it supports
# open order loading.
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
@ -924,136 +1011,50 @@ async def handle_order_updates(
)
continue
lasttxid = reqids2txids.get(reqid)
txid = rest.get('txid')
# update the msg chain
chain = apiflows[reqid]
chain.maps.append(event)
resps, errored = process_status(
event,
oid,
token,
chain,
)
if resps:
for resp in resps:
await ems_stream.send(resp)
if status == 'error':
# any of ``{'add', 'edit', 'cancel'}``
action = etype.removesuffix('OrderStatus')
errmsg = rest['errorMessage']
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
await ems_stream.send(BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=chain.get('symbol', 'N/A'),
if txid:
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
))
txid = txid or lasttxid
if (
isinstance(lasttxid, TooFastEdit)
or errored
txid
# we throttle too-fast-requests on the ems side
or (txid and isinstance(txid, TooFastEdit))
):
# client was editting too quickly
# so we instead cancel this order
log.cancel(f'Cancelling order for {reqid}@{txid}')
log.cancel(
f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
else:
# XXX: we **must** do this mapping for edit order
# status updates since the `openOrders` sub above
# never relays back the correct client-side `reqid`
# that is put in the order request..
reqids2txids[reqid] = txid
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
chain: ChainMap,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.removesuffix('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=chain.get('symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitted order: {descr}\n'
f'ems oid: {oid}\n'
f'brokerd reqid: {reqid}\n'
f'txid: {txid}\n'
)
return [], False
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
return [], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
for txid in rest.get('txid', [chain['reqid']]):
log.info(
f'Cancelling order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n'
)
return [], False
def norm_trade_records(
ledger: dict[str, Any],

View File

@ -152,19 +152,8 @@ async def stream_messages(
continue
case {
'connectionID': _,
'event': 'systemStatus',
'status': 'online',
'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
# passthrough sub msgs
yield msg

View File

@ -39,7 +39,11 @@ from docker.errors import (
APIError,
# ContainerError,
)
from requests.exceptions import ConnectionError, ReadTimeout
import requests
from requests.exceptions import (
ConnectionError,
ReadTimeout,
)
from ..log import get_logger, get_console_log
from .. import config
@ -188,13 +192,12 @@ class Container:
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
log.error(
f'SIGKILL-ing: {self.cntr.id} after {delay}s\n'
)
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
@ -218,20 +221,25 @@ class Container:
self.try_signal('SIGINT')
start = time.time()
for _ in range(30):
for _ in range(6):
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.cancel('polling for CNTR logs...')
try:
await self.process_logs_until(stop_msg)
except ApplicationLogError:
hard_kill = True
else:
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and
# terminated.
break
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
break
if cs.cancelled_caught:
# on timeout just try a hard kill after
# a quick container sync-wait.
hard_kill = True
try:
log.info(f'Polling for container shutdown:\n{cid}')
@ -254,9 +262,16 @@ class Container:
except (
docker.errors.APIError,
ConnectionError,
requests.exceptions.ConnectionError,
trio.Cancelled,
):
log.exception('Docker connection failure')
self.hard_kill(start)
raise
except trio.Cancelled:
log.exception('trio cancelled...')
self.hard_kill(start)
else:
hard_kill = True
@ -305,16 +320,13 @@ async def open_ahabd(
))
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
finally:
# needed?
with trio.CancelScope(shield=True):
await cntr.cancel(stop_msg)
await cntr.cancel(stop_msg)
async def start_ahab(