Add initial `kraken` live order loading

open_order_loading
Tyler Goodlet 2022-08-11 21:30:32 -04:00
parent c8bff81220
commit e2cd8c4aef
1 changed files with 53 additions and 37 deletions

View File

@ -39,7 +39,6 @@ from bidict import bidict
import pendulum
import trio
import tractor
import wsproto
from piker.pp import (
Position,
@ -49,6 +48,8 @@ from piker.pp import (
open_pps,
)
from piker.clearing._messages import (
Order,
Status,
BrokerdCancel,
BrokerdError,
BrokerdFill,
@ -126,7 +127,7 @@ async def handle_order_requests(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, could not cancelling..'
f'Edit too fast:{reqid}, cancelling..'
),
)
@ -249,7 +250,7 @@ async def handle_order_requests(
@acm
async def subscribe(
ws: wsproto.WSConnection,
ws: NoBsWs,
token: str,
subs: list[tuple[str, dict]] = [
('ownTrades', {
@ -632,8 +633,6 @@ async def handle_order_updates(
# to do all fill/status/pp updates in that sub and just use
# this one for ledger syncs?
# XXX: ASK SUPPORT ABOUT THIS!
# For eg. we could take the "last 50 trades" and do a diff
# with the ledger and then only do a re-sync if something
# seems amiss?
@ -696,7 +695,6 @@ async def handle_order_updates(
status_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
@ -870,38 +868,56 @@ async def handle_order_updates(
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
continue
elif noid: # a non-ems-active order
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.cancel(
f'Rx unknown active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
# a non-ems-active order, emit live
# order embedded in status msg.
elif noid:
# parse out existing live order
descr = rest['descr']
fqsn = descr['pair'].replace(
'/', '').lower()
price = float(descr['price'])
size = float(rest['vol'])
action = descr['type']
# register the userref value from
# kraken (usually an `int` staring
# at 1?) as our reqid.
reqids2txids[reqid] = txid
oid = str(reqid)
ids[oid] = reqid # NOTE!: str -> int
# fill out ``Status`` + boxed ``Order``
status_msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=oid,
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=oid,
symbol=fqsn,
account=acc_name,
price=price,
size=size,
),
src='kraken',
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
continue
# remap statuses to ems set.
ems_status = {
'open': 'submitted',
'closed': 'filled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
# TODO: i like the open / closed semantics
# more we should consider them for internals
apiflows[reqid].maps.append(status_msg)
await ems_stream.send(status_msg)
continue
# send BrokerdStatus messages for all
# order state updates
@ -912,7 +928,7 @@ async def handle_order_updates(
account=f'kraken.{acctid}',
# everyone doin camel case..
status=ems_status, # force lower case
status=status, # force lower case
filled=vlm,
reason='', # why held?