get stashed changes

kraken_orders
Konstantine Tsafatinos 2022-02-14 17:02:37 -05:00
parent 0122669dd4
commit 6c54c81f01
1 changed files with 106 additions and 38 deletions

View File

@ -119,6 +119,19 @@ class Pair(BaseModel):
ordermin: float # minimum order volume for pair
class Order(BaseModel):
"""Order class that helps parse and validate order stream"""
txid: str # kraken order transaction id
action: str # buy or sell
ordertype: str # limit order ##TODO: Do I need this?
pair: str # order pair
price: str # price of asset
vol: str # vol of asset
status: str # order status
opentm: str # time of order
timeinforce: str # e.g GTC, GTD
@dataclass
class OHLC:
"""Description of the flattened OHLC quote format.
@ -494,6 +507,8 @@ async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
ws: NoBsWs,
token: str,
) -> None:
@ -523,50 +538,65 @@ async def handle_order_requests(
# validate
temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg)
def slashinsert(str):
midPoint = len(str)//2
return str[:midPoint] + '/' + str[midPoint:]
# Send order via websocket
order_msg = {
"event": "addOrder",
"ordertype": "limit",
"pair": slashinsert(order.symbol.upper()),
"price": str(order.price),
"token": token,
"type": order.action,
"volume": str(order.size)
}
await ws.send_msg(order_msg)
# call our client api to submit the order
resp = await client.submit_limit(
#resp = await client.submit_limit(
# oid=order.oid,
# symbol=order.symbol,
# price=order.price,
# action=order.action,
# size=order.size,
# ## XXX: how do I handle new orders
# reqid=temp_id,
#)
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=temp_id,
)
#err = resp['error']
#if err:
# log.error(f'Failed to submit order')
# await ems_order_stream.send(
# BrokerdError(
# oid=order.oid,
# reqid=temp_id,
# symbol=order.symbol,
# reason="Failed order submission",
# broker_details=resp
# ).dict()
# )
#else:
# ## TODO: handle multiple cancels
# ## txid is an array of strings
# reqid = resp['result']['txid'][0]
# # deliver ack that order has been submitted to broker routing
# await ems_order_stream.send(
# BrokerdOrderAck(
err = resp['error']
if err:
log.error(f'Failed to submit order')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=temp_id,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
## TODO: handle multiple cancels
## txid is an array of strings
reqid = resp['result']['txid'][0]
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# # ems order request id
# oid=order.oid,
# ems order request id
oid=order.oid,
# # broker specific request id
# reqid=reqid,
# broker specific request id
reqid=reqid,
# # account the made the order
# account=order.account
# account the made the order
account=order.account
).dict()
)
# ).dict()
# )
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
@ -685,16 +715,31 @@ async def trades_dialogue(
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
#n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
n.start_soon(handle_order_requests, client, ems_stream, ws, token)
from pprint import pprint
async for msg in process_order_msgs(ws):
pprint(msg)
#await ems_order_stream.send(
# BrokerdOrderAck(
# # ems order request id
# oid=order.oid,
# # broker specific request id
# reqid=reqid,
# # account the made the order
# account=order.account
# ).dict()
#)
async def stream_messages(
@ -801,6 +846,29 @@ async def process_order_msgs(
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
# check that we are on openOrders stream XXX: Maybe do something with sequence
assert msg[1] == 'openOrders'
orders = msg[0]
for order in orders:
txid = list(order.keys())[0]
order_msg = Order(
txid=txid,
action=order[txid]['descr']['type'],
ordertype=order[txid]['descr']['ordertype'],
pair=order[txid]['descr']['pair'],
price=order[txid]['descr']['price'],
vol=order[txid]['vol'],
status=order[txid]['status'],
opentm=order[txid]['opentm'],
timeinforce=order[txid]['timeinforce']
)
print(order_msg)
print(msg[0][0].keys())
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':