remove ws support for orders, use rest api instead for easy oid association

kraken_orders
Konstantine Tsafatinos 2022-02-18 22:24:33 -05:00
parent 46948e0a8b
commit b1bff1be85
1 changed files with 93 additions and 174 deletions

View File

@ -508,9 +508,9 @@ async def handle_order_requests(
client: Client, client: Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
ws: NoBsWs, #ws: NoBsWs,
token: str, #token: str,
userref_oid_map: dict, #userref_oid_map: dict,
) -> None: ) -> None:
@ -540,112 +540,111 @@ async def handle_order_requests(
# validate # validate
temp_id = next(userref_counter) temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
def slashinsert(str): #def slashinsert(str):
midPoint = len(str)//2 # midPoint = len(str)//2
return str[:midPoint] + '/' + str[midPoint:] # return str[:midPoint] + '/' + str[midPoint:]
# Send order via websocket ## Send order via websocket
order_msg = { #order_msg = {
"event": "addOrder", # "event": "addOrder",
"ordertype": "limit", # "ordertype": "limit",
"pair": slashinsert(order.symbol.upper()), # "pair": slashinsert(order.symbol.upper()),
"price": str(order.price), # "price": str(order.price),
"token": token, # "token": token,
"type": order.action, # "type": order.action,
"volume": str(order.size), # "volume": str(order.size),
"userref": str(temp_id) # "userref": str(temp_id)
} #}
# add oid userref mapping ## add oid userref mapping
userref_oid_map[str(temp_id)] = { #userref_oid_map[str(temp_id)] = {
'oid': order.oid, 'account': order.account # 'oid': order.oid, 'account': order.account
} #}
await ws.send_msg(order_msg) #await ws.send_msg(order_msg)
# call our client api to submit the order # call our client api to submit the order
#resp = await client.submit_limit( resp = await client.submit_limit(
# oid=order.oid, oid=order.oid,
# symbol=order.symbol, symbol=order.symbol,
# price=order.price, price=order.price,
# action=order.action, action=order.action,
# size=order.size, size=order.size,
# ## XXX: how do I handle new orders reqid=temp_id,
# reqid=temp_id, )
#)
#err = resp['error'] err = resp['error']
#if err: if err:
# log.error(f'Failed to submit order') log.error(f'Failed to submit order')
# await ems_order_stream.send( await ems_order_stream.send(
# BrokerdError( BrokerdError(
# oid=order.oid, oid=order.oid,
# reqid=temp_id, reqid=temp_id,
# symbol=order.symbol, symbol=order.symbol,
# reason="Failed order submission", reason="Failed order submission",
# broker_details=resp broker_details=resp
# ).dict() ).dict()
# ) )
#else: else:
# ## TODO: handle multiple cancels ## TODO: handle multiple cancels
# ## txid is an array of strings ## txid is an array of strings
# reqid = resp['result']['txid'][0] reqid = resp['result']['txid'][0]
# # deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
# await ems_order_stream.send( await ems_order_stream.send(
# BrokerdOrderAck( BrokerdOrderAck(
# # ems order request id # ems order request id
# oid=order.oid, oid=order.oid,
# # broker specific request id # broker specific request id
# reqid=reqid, reqid=reqid,
# # account the made the order # account the made the order
# account=order.account account=order.account
# ).dict() ).dict()
# ) )
elif action == 'cancel': elif action == 'cancel':
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
cancel_msg = { #cancel_msg = {
"event": "cancelOrder", # "event": "cancelOrder",
"token": token, # "token": token,
"txid": [msg.reqid] # "txid": [msg.reqid]
} #}
await ws.send_msg(cancel_msg) #await ws.send_msg(cancel_msg)
## Send order cancellation to kraken # Send order cancellation to kraken
#resp = await client.submit_cancel( resp = await client.submit_cancel(
# reqid=msg.reqid reqid=msg.reqid
#) )
#try: try:
# # Check to make sure there was no error returned by # Check to make sure there was no error returned by
# # the kraken endpoint. Assert one order was cancelled # the kraken endpoint. Assert one order was cancelled
# assert resp['error'] == [] assert resp['error'] == []
# assert resp['result']['count'] == 1 assert resp['result']['count'] == 1
# ## TODO: Change this code using .get ## TODO: Change this code using .get
# try: try:
# pending = resp['result']['pending'] pending = resp['result']['pending']
# # Check to make sure the cancellation is NOT pending, # Check to make sure the cancellation is NOT pending,
# # then send the confirmation to the ems order stream # then send the confirmation to the ems order stream
# except KeyError: except KeyError:
# await ems_order_stream.send( await ems_order_stream.send(
# BrokerdStatus( BrokerdStatus(
# reqid=msg.reqid, reqid=msg.reqid,
# account=msg.account, account=msg.account,
# time_ns=time.time_ns(), time_ns=time.time_ns(),
# status='cancelled', status='cancelled',
# reason='Order cancelled', reason='Order cancelled',
# broker_details={'name': 'kraken'} broker_details={'name': 'kraken'}
# ).dict() ).dict()
# ) )
#except AssertionError: except AssertionError:
# log.error(f'Order cancel was not successful') log.error(f'Order cancel was not successful')
else: else:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')
@ -713,13 +712,13 @@ async def trades_dialogue(
msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict()) all_positions.append(msg.dict())
## TODO: create a new ems message schema for open orders
open_orders = await client.kraken_endpoint('OpenOrders', {}) open_orders = await client.kraken_endpoint('OpenOrders', {})
print(open_orders)
#await tractor.breakpoint() #await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,))) await ctx.started((all_positions, (acc_name,)))
#await trio.sleep_forever()
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received # Assert that a token was actually received
resp = await client.kraken_endpoint('GetWebSocketsToken', {}) resp = await client.kraken_endpoint('GetWebSocketsToken', {})
@ -731,96 +730,16 @@ async def trades_dialogue(
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
## TODO: maybe add multiple accounts ## TODO: maybe add multiple accounts
#n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
# Mapping from userref passed to kraken and oid from piker
userref_oid_map = {}
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws-auth.kraken.com/', 'wss://ws-auth.kraken.com/',
fixture=subscribe, fixture=subscribe,
token=token, token=token,
) as ws: ) as ws:
n.start_soon(
handle_order_requests,
client,
ems_stream,
ws,
token,
userref_oid_map
)
from pprint import pprint from pprint import pprint
pending_orders = []
async for msg in process_order_msgs(ws): async for msg in process_order_msgs(ws):
pprint(msg) pprint(msg)
for order in msg:
## TODO: Maybe do a better check and handle accounts
if type(order) == dict:
if order['status'] == 'canceled':
await ems_stream.send(
BrokerdStatus(
account='kraken.spot',
reqid=order['txid'],
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
for pending_order in pending_orders:
if pending_order.txid == order['txid'] and order['status'] == 'open':
await ems_stream.send(
BrokerdOrderAck(
# ems order request id
oid=userref_oid_map[pending_order.userref]['oid'],
# broker specific request id
reqid=order['txid'],
# account the made the order
account=userref_oid_map[
pending_order.userref
]['account']
).dict()
)
elif order.status == 'pending':
pending_orders.append(order)
#if not pending_oder and order.status == 'open':
# await ems_stream.send(
# BrokerdOrder(
# action=order.action,
# oid='',
# ## TODO: support multi accounts?
# account='kraken.spot',
# time_ns=int(float(order.opentm) * 10**9),
# reqid=order.txid,
# symbol=order.pair.replace('/', '').lower(),# + \
# #'.kraken',
# price=float(order.price),
# size=float(order.vol)
# ).dict()
# )
#await ems_order_stream.send(
# BrokerdOrderAck(
# # ems order request id
# oid=order.oid,
# # broker specific request id
# reqid=reqid,
# # account the made the order
# account=order.account
# ).dict()
#)
async def stream_messages( async def stream_messages(