Emit fills from `openOrders` block

The (partial) fills from this sub are most indicative of clears (also
says support) whereas the msgs in the `ownTrades` sub are only emitted
after the entire order request has completed - there is no size-vlm
remaining.

Further enhancements:
- this also includes proper subscription-syncing inside `subscribe()` with
  a small pre-msg-loop which waits on ack-msgs for each sub and raises any
  errors. This approach should probably be implemented for the data feed
  streams as well.
- configure the `ownTrades` sub to not bother sending historical data on
  startup.
- make the `openOrders` sub include rate limit counters.
- handle the rare case where the ems is trying to cancel an order which
  was just edited and hasn't yet had it's new `txid` registered.
kraken_ws_orders
Tyler Goodlet 2022-08-01 19:22:31 -04:00
parent 1a291939c3
commit 30bcfdcc83
1 changed files with 93 additions and 46 deletions

View File

@ -114,16 +114,32 @@ async def handle_order_requests(
}: }:
cancel = BrokerdCancel(**msg) cancel = BrokerdCancel(**msg)
reqid = ids[cancel.oid] reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel: try:
# https://docs.kraken.com/websockets/#message-cancelOrder txid = reqids2txids[reqid]
await ws.send_msg({ except KeyError:
'event': 'cancelOrder', # XXX: not sure if this block ever gets hit now?
'token': token, log.error('TOO FAST CANCEL/EDIT')
'reqid': reqid, reqids2txids[reqid] = TooFastEdit(reqid)
'txid': [txid], # should be txid from submission await ems_order_stream.send(
}) BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, could not cancelling..'
),
)
)
else:
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
@ -235,9 +251,17 @@ async def handle_order_requests(
async def subscribe( async def subscribe(
ws: wsproto.WSConnection, ws: wsproto.WSConnection,
token: str, token: str,
subs: list[str] = [ subs: list[tuple[str, dict]] = [
'ownTrades', ('ownTrades', {
'openOrders', # don't send first 50 trades on startup,
# we already pull this manually from the rest endpoint.
'snapshot': False,
},),
('openOrders', {
# include rate limit counters
'ratecounter': True,
},),
], ],
): ):
''' '''
@ -250,12 +274,15 @@ async def subscribe(
# more specific logic for this in kraken's sync client: # more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token assert token
for sub in subs: subnames: set[str] = set()
for name, sub_opts in subs:
msg = { msg = {
'event': 'subscribe', 'event': 'subscribe',
'subscription': { 'subscription': {
'name': sub, 'name': name,
'token': token, 'token': token,
**sub_opts,
} }
} }
@ -264,7 +291,34 @@ async def subscribe(
# since internally the ws methods appear to be FIFO # since internally the ws methods appear to be FIFO
# locked. # locked.
await ws.send_msg(msg) await ws.send_msg(msg)
subnames.add(name)
# wait on subscriptionn acks
with trio.move_on_after(5):
while True:
match (msg := await ws.recv_msg()):
case {
'event': 'subscriptionStatus',
'status': 'subscribed',
'subscription': sub_opts,
} as msg:
log.info(
f'Sucessful subscribe for {sub_opts}:\n'
f'{pformat(msg)}'
)
subnames.remove(sub_opts['name'])
if not subnames:
break
case {
'event': 'subscriptionStatus',
'status': 'error',
'errorMessage': errmsg,
} as msg:
raise RuntimeError(
f'{errmsg}\n\n'
f'{pformat(msg)}'
)
yield yield
for sub in subs: for sub in subs:
@ -616,9 +670,10 @@ async def handle_order_updates(
for (tid, trade) in entry.items() for (tid, trade) in entry.items()
# don't re-process datums we've already seen # don't re-process datums we've already seen
if tid not in ledger_trans # if tid not in ledger_trans
} }
for tid, trade in trades.items(): for tid, trade in trades.items():
assert tid not in ledger_trans
txid = trade['ordertxid'] txid = trade['ordertxid']
reqid = trade.get('userref') reqid = trade.get('userref')
@ -636,22 +691,8 @@ async def handle_order_updates(
size = float(trade['vol']) size = float(trade['vol'])
broker_time = float(trade['time']) broker_time = float(trade['time'])
# send a fill msg for gui update # TODO: we can emit this on the "closed" state in
fill_msg = BrokerdFill( # the `openOrders` sub-block below.
time_ns=time.time_ns(),
reqid=reqid,
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'} | trade,
broker_time=broker_time
)
await ems_stream.send(fill_msg)
status_msg = BrokerdStatus( status_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
@ -905,17 +946,33 @@ async def handle_order_updates(
# NOTE: there is no `status` field # NOTE: there is no `status` field
case { case {
'vol_exec': vlm, 'vol_exec': vlm,
'avg_price': price,
'userref': reqid, 'userref': reqid,
**rest, **rest,
}: } as msg:
# TODO: emit fill msg from here?
ourreqid = reqids2txids.inverse[txid] ourreqid = reqids2txids.inverse[txid]
assert reqid == ourreqid assert reqid == ourreqid
log.info( log.info(
f'openOrders vlm={vlm} Fill for {reqid}:\n' f'openOrders vlm={vlm} Fill for {reqid}:\n'
f'{update_msg}' f'{update_msg}'
) )
continue
fill_msg = BrokerdFill(
time_ns=time.time_ns(),
reqid=reqid,
# action=action, # just use size value
# for now?
size=vlm,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'} | trade,
broker_time=broker_time
)
await ems_stream.send(fill_msg)
case _: case _:
log.warning( log.warning(
@ -923,17 +980,7 @@ async def handle_order_updates(
f'{txid}:{order_msg}' f'{txid}:{order_msg}'
) )
# NOTE: The only reason for this seems to be remapping # order request status updates
# underlying `txid` values on order "edits" which the
# `openOrders` sub doesn't seem to have any knowledge of.
# UPDATE: seems like we don't need this any more thanks to
# passing through the dialog key / reqid in the `newuserref`
# field on edit requests.
# I'd also like to ask them which event guarantees that the
# the live order is now in the book, since these status ones
# almost seem more like request-acks then state guarantees.
# ANSWER the `openOrders` is more indicative of "liveness".
case { case {
'event': etype, 'event': etype,
'status': status, 'status': status,
@ -1003,8 +1050,8 @@ async def handle_order_updates(
'reqid': reqid or 0, 'reqid': reqid or 0,
'txid': [txid], 'txid': [txid],
}) })
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')