use a mapping from userref to oid for order ack
parent
6c54c81f01
commit
d826a66c8c
|
@ -130,6 +130,7 @@ class Order(BaseModel):
|
|||
status: str # order status
|
||||
opentm: str # time of order
|
||||
timeinforce: str # e.g GTC, GTD
|
||||
userref: str # for a mapping to oids
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -509,6 +510,7 @@ async def handle_order_requests(
|
|||
ems_order_stream: tractor.MsgStream,
|
||||
ws: NoBsWs,
|
||||
token: str,
|
||||
userref_oid_map: dict,
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -541,6 +543,7 @@ async def handle_order_requests(
|
|||
def slashinsert(str):
|
||||
midPoint = len(str)//2
|
||||
return str[:midPoint] + '/' + str[midPoint:]
|
||||
|
||||
# Send order via websocket
|
||||
order_msg = {
|
||||
"event": "addOrder",
|
||||
|
@ -549,9 +552,14 @@ async def handle_order_requests(
|
|||
"price": str(order.price),
|
||||
"token": token,
|
||||
"type": order.action,
|
||||
"volume": str(order.size)
|
||||
"volume": str(order.size),
|
||||
"userref": str(temp_id)
|
||||
}
|
||||
|
||||
# add oid userref mapping
|
||||
userref_oid_map[str(temp_id)] = {
|
||||
'oid': order.oid, 'account': order.account
|
||||
}
|
||||
|
||||
await ws.send_msg(order_msg)
|
||||
|
||||
|
@ -717,15 +725,69 @@ async def trades_dialogue(
|
|||
## TODO: maybe add multiple accounts
|
||||
#n.start_soon(handle_order_requests, client, ems_stream)
|
||||
|
||||
# Mapping from userref passed to kraken and oid from piker
|
||||
userref_oid_map = {}
|
||||
|
||||
async with open_autorecon_ws(
|
||||
'wss://ws-auth.kraken.com/',
|
||||
fixture=subscribe,
|
||||
token=token,
|
||||
) as ws:
|
||||
n.start_soon(handle_order_requests, client, ems_stream, ws, token)
|
||||
n.start_soon(
|
||||
handle_order_requests,
|
||||
client,
|
||||
ems_stream,
|
||||
ws,
|
||||
token,
|
||||
userref_oid_map
|
||||
)
|
||||
from pprint import pprint
|
||||
pending_orders = []
|
||||
async for msg in process_order_msgs(ws):
|
||||
pprint(msg)
|
||||
for order in msg:
|
||||
## TODO: Maybe do a better
|
||||
if type(order) == dict:
|
||||
for pending_order in pending_orders:
|
||||
if pending_order.txid == order['txid'] and order['status'] == 'open':
|
||||
await ems_stream.send(
|
||||
BrokerdOrderAck(
|
||||
|
||||
# ems order request id
|
||||
oid=userref_oid_map[pending_order.userref]['oid'],
|
||||
|
||||
# broker specific request id
|
||||
reqid=order['txid'],
|
||||
|
||||
# account the made the order
|
||||
account=userref_oid_map[
|
||||
pending_order.userref
|
||||
]['account']
|
||||
|
||||
).dict()
|
||||
)
|
||||
|
||||
elif order.status == 'pending':
|
||||
pending_orders.append(order)
|
||||
|
||||
|
||||
#if not pending_oder and order.status == 'open':
|
||||
# await ems_stream.send(
|
||||
# BrokerdOrder(
|
||||
# action=order.action,
|
||||
# oid='',
|
||||
# ## TODO: support multi accounts?
|
||||
# account='kraken.spot',
|
||||
# time_ns=int(float(order.opentm) * 10**9),
|
||||
# reqid=order.txid,
|
||||
# symbol=order.pair.replace('/', '').lower(),# + \
|
||||
# #'.kraken',
|
||||
# price=float(order.price),
|
||||
# size=float(order.vol)
|
||||
# ).dict()
|
||||
# )
|
||||
|
||||
|
||||
#await ems_order_stream.send(
|
||||
# BrokerdOrderAck(
|
||||
|
||||
|
@ -840,17 +902,26 @@ async def process_order_msgs(
|
|||
Parse and pack data feed messages.
|
||||
|
||||
'''
|
||||
sequence_counter = 0
|
||||
async for msg in stream_messages(ws):
|
||||
|
||||
# TODO: write your order event parser here!
|
||||
# HINT: create a ``pydantic.BaseModel`` to parse and validate
|
||||
# and then in the caller recast to our native ``BrokerdX`` msg types.
|
||||
|
||||
# check that we are on openOrders stream XXX: Maybe do something with sequence
|
||||
try:
|
||||
# check that we are on openOrders stream and that msgs are arriving
|
||||
# in sequence with kraken
|
||||
assert msg[1] == 'openOrders'
|
||||
orders = msg[0]
|
||||
assert msg[2]['sequence'] > sequence_counter
|
||||
sequence_counter += 1
|
||||
raw_msgs = msg[0]
|
||||
# TODO: get length and start list
|
||||
order_msgs = []
|
||||
|
||||
for order in orders:
|
||||
try:
|
||||
# check if its a new order or an update msg
|
||||
for order in raw_msgs:
|
||||
txid = list(order.keys())[0]
|
||||
order_msg = Order(
|
||||
txid=txid,
|
||||
|
@ -861,14 +932,31 @@ async def process_order_msgs(
|
|||
vol=order[txid]['vol'],
|
||||
status=order[txid]['status'],
|
||||
opentm=order[txid]['opentm'],
|
||||
timeinforce=order[txid]['timeinforce']
|
||||
timeinforce=order[txid]['timeinforce'],
|
||||
userref=order[txid]['userref']
|
||||
)
|
||||
print(order_msg)
|
||||
order_msgs.append(order_msg)
|
||||
|
||||
yield order_msgs
|
||||
|
||||
except KeyError:
|
||||
for order in raw_msgs:
|
||||
txid = list(order.keys())[0]
|
||||
## TODO: maybe use a pydantic.BaseModel
|
||||
order_msg = {
|
||||
'txid': txid,
|
||||
'status': order[txid]['status'],
|
||||
'userref': order[txid]['userref']
|
||||
}
|
||||
order_msgs.append(order_msg)
|
||||
|
||||
yield order_msgs
|
||||
|
||||
|
||||
except AssertionError:
|
||||
print(f'UNHANDLED MSG: {msg}')
|
||||
yield msg
|
||||
|
||||
|
||||
print(msg[0][0].keys())
|
||||
# form of order msgs:
|
||||
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
|
||||
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
|
||||
|
@ -876,7 +964,7 @@ async def process_order_msgs(
|
|||
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
|
||||
# requested'}}], 'openOrders', {'sequence': 4}]
|
||||
|
||||
yield msg
|
||||
# yield msg
|
||||
|
||||
|
||||
def normalize(
|
||||
|
|
Loading…
Reference in New Issue