repurpose ws code for ownTrades stream, get trade authentication going

kraken_orders
Konstantine Tsafatinos 2022-02-19 00:14:58 -05:00
parent b1bff1be85
commit ee0be13af1
1 changed files with 61 additions and 87 deletions

View File

@ -119,18 +119,13 @@ class Pair(BaseModel):
ordermin: float # minimum order volume for pair
class Order(BaseModel):
class Trade(BaseModel):
"""Order class that helps parse and validate order stream"""
txid: str # kraken order transaction id
reqid: str # kraken order transaction id
action: str # buy or sell
ordertype: str # limit order ##TODO: Do I need this?
pair: str # order pair
price: str # price of asset
vol: str # vol of asset
status: str # order status
opentm: str # time of order
timeinforce: str # e.g GTC, GTD
userref: str # for a mapping to oids
size: str # vol of asset
broker_time: str # e.g GTC, GTD
@dataclass
@ -540,28 +535,6 @@ async def handle_order_requests(
# validate
temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg)
#def slashinsert(str):
# midPoint = len(str)//2
# return str[:midPoint] + '/' + str[midPoint:]
## Send order via websocket
#order_msg = {
# "event": "addOrder",
# "ordertype": "limit",
# "pair": slashinsert(order.symbol.upper()),
# "price": str(order.price),
# "token": token,
# "type": order.action,
# "volume": str(order.size),
# "userref": str(temp_id)
#}
## add oid userref mapping
#userref_oid_map[str(temp_id)] = {
# 'oid': order.oid, 'account': order.account
#}
#await ws.send_msg(order_msg)
# call our client api to submit the order
resp = await client.submit_limit(
@ -608,14 +581,6 @@ async def handle_order_requests(
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
#cancel_msg = {
# "event": "cancelOrder",
# "token": token,
# "txid": [msg.reqid]
#}
#await ws.send_msg(cancel_msg)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
@ -669,7 +634,7 @@ async def trades_dialogue(
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
trades_sub = make_auth_sub(
{'name': 'openOrders', 'token': token}
{'name': 'ownTrades', 'token': token}
)
# TODO: we want to eventually allow unsubs which should
@ -692,7 +657,7 @@ async def trades_dialogue(
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': ['openOrders'],
'subscription': ['ownTrades'],
})
# XXX: do we need to ack the unsub?
@ -732,14 +697,50 @@ async def trades_dialogue(
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
# Process trades msg stream of ws
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
from pprint import pprint
async for msg in process_order_msgs(ws):
async for msg in process_trade_msgs(ws):
pprint(msg)
for trade in msg:
# check the type of packaged message
assert type(trade) == Trade
# prepare and send a status update for line update
trade_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
account='kraken.spot',
status='executed',
filled=float(trade.size),
reason='Order filled by kraken',
# remaining='' ## TODO: not sure what to do here.
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
}
)
await ems_stream.send(trade_msg.dict())
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),
action=trade.action,
size=float(trade.size),
price=float(trade.price),
## TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)
await ems_stream.send(fill_msg.dict())
async def stream_messages(
@ -833,7 +834,7 @@ async def process_data_feed_msgs(
yield msg
async def process_order_msgs(
async def process_trade_msgs(
ws: NoBsWs,
):
'''
@ -848,62 +849,35 @@ async def process_order_msgs(
# and then in the caller recast to our native ``BrokerdX`` msg types.
try:
# check that we are on openOrders stream and that msgs are arriving
# in sequence with kraken
assert msg[1] == 'openOrders'
# check that we are on the ownTrades stream and that msgs are
# arriving in sequence with kraken
assert msg[1] == 'ownTrades'
assert msg[2]['sequence'] > sequence_counter
sequence_counter += 1
raw_msgs = msg[0]
# TODO: get length and start list
order_msgs = []
trade_msgs = []
try:
# Check that we are only processing new trades
if msg[2]['sequence'] != 1:
# check if its a new order or an update msg
for order in raw_msgs:
txid = list(order.keys())[0]
order_msg = Order(
txid=txid,
action=order[txid]['descr']['type'],
ordertype=order[txid]['descr']['ordertype'],
pair=order[txid]['descr']['pair'],
price=order[txid]['descr']['price'],
vol=order[txid]['vol'],
status=order[txid]['status'],
opentm=order[txid]['opentm'],
timeinforce=order[txid]['timeinforce'],
userref=order[txid]['userref']
for trade_msg in raw_msgs:
trade = list(trade_msg.values())[0]
order_msg = Trade(
reqid=trade['ordertxid'],
action=trade['type'],
price=trade['price'],
size=trade['vol'],
broker_time=trade['time']
)
order_msgs.append(order_msg)
yield order_msgs
except KeyError:
for order in raw_msgs:
txid = list(order.keys())[0]
## TODO: maybe use a pydantic.BaseModel
order_msg = {
'txid': txid,
'status': order[txid]['status'],
'userref': order[txid]['userref']
}
order_msgs.append(order_msg)
yield order_msgs
trade_msgs.append(order_msg)
yield trade_msgs
except AssertionError:
print(f'UNHANDLED MSG: {msg}')
yield msg
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
# '0.00000000', 'fee': '0.00000000', 'avg_price':
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
# requested'}}], 'openOrders', {'sequence': 4}]
# yield msg
def normalize(
ohlc: OHLC,