Split existing live-open case into its own block
parent
e2cd8c4aef
commit
f6ba95a6c7
|
@ -741,6 +741,57 @@ async def handle_order_updates(
|
||||||
txid, update_msg = list(order_msg.items())[0]
|
txid, update_msg = list(order_msg.items())[0]
|
||||||
match update_msg:
|
match update_msg:
|
||||||
|
|
||||||
|
# EMS-unknown live order that needs to be
|
||||||
|
# delivered and loaded on the client-side.
|
||||||
|
case {
|
||||||
|
'userref': reqid,
|
||||||
|
|
||||||
|
# during a fill this field is **not**
|
||||||
|
# provided! but, it is always avail on
|
||||||
|
# actual status updates.. see case above.
|
||||||
|
'status': status,
|
||||||
|
**rest,
|
||||||
|
} if (
|
||||||
|
ids.inverse.get(reqid) is None
|
||||||
|
):
|
||||||
|
# parse out existing live order
|
||||||
|
descr = rest['descr']
|
||||||
|
fqsn = descr['pair'].replace(
|
||||||
|
'/', '').lower()
|
||||||
|
price = float(descr['price'])
|
||||||
|
size = float(rest['vol'])
|
||||||
|
action = descr['type']
|
||||||
|
|
||||||
|
# register the userref value from
|
||||||
|
# kraken (usually an `int` staring
|
||||||
|
# at 1?) as our reqid.
|
||||||
|
reqids2txids[reqid] = txid
|
||||||
|
oid = str(reqid)
|
||||||
|
ids[oid] = reqid # NOTE!: str -> int
|
||||||
|
|
||||||
|
# fill out ``Status`` + boxed ``Order``
|
||||||
|
status_msg = Status(
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
resp='open',
|
||||||
|
oid=oid,
|
||||||
|
reqid=reqid,
|
||||||
|
|
||||||
|
# embedded order info
|
||||||
|
req=Order(
|
||||||
|
action=action,
|
||||||
|
exec_mode='live',
|
||||||
|
oid=oid,
|
||||||
|
symbol=fqsn,
|
||||||
|
account=acc_name,
|
||||||
|
price=price,
|
||||||
|
size=size,
|
||||||
|
),
|
||||||
|
src='kraken',
|
||||||
|
)
|
||||||
|
apiflows[reqid].maps.append(status_msg)
|
||||||
|
await ems_stream.send(status_msg)
|
||||||
|
continue
|
||||||
|
|
||||||
# XXX: eg. of full msg schema:
|
# XXX: eg. of full msg schema:
|
||||||
# {'avg_price': _,
|
# {'avg_price': _,
|
||||||
# 'cost': _,
|
# 'cost': _,
|
||||||
|
@ -819,105 +870,56 @@ async def handle_order_updates(
|
||||||
)
|
)
|
||||||
|
|
||||||
oid = ids.inverse.get(reqid)
|
oid = ids.inverse.get(reqid)
|
||||||
|
# XXX: too fast edit handled by the
|
||||||
|
# request handler task: this
|
||||||
|
# scenario occurs when ems side
|
||||||
|
# requests are coming in too quickly
|
||||||
|
# such that there is no known txid
|
||||||
|
# yet established for the ems
|
||||||
|
# dialog's last reqid when the
|
||||||
|
# request handler task is already
|
||||||
|
# receceiving a new update for that
|
||||||
|
# reqid. In this case we simply mark
|
||||||
|
# the reqid as being "too fast" and
|
||||||
|
# then when we get the next txid
|
||||||
|
# update from kraken's backend, and
|
||||||
|
# thus the new txid, we simply
|
||||||
|
# cancel the order for now.
|
||||||
|
|
||||||
|
# TODO: Ideally we eventually
|
||||||
|
# instead make the client side of
|
||||||
|
# the ems block until a submission
|
||||||
|
# is confirmed by the backend
|
||||||
|
# instead of this hacky throttle
|
||||||
|
# style approach and avoid requests
|
||||||
|
# coming in too quickly on the other
|
||||||
|
# side of the ems, aka the client
|
||||||
|
# <-> ems dialog.
|
||||||
if (
|
if (
|
||||||
status == 'open'
|
status == 'open'
|
||||||
and (
|
and isinstance(
|
||||||
# XXX: too fast edit handled by the
|
reqids2txids.get(reqid),
|
||||||
# request handler task: this
|
TooFastEdit
|
||||||
# scenario occurs when ems side
|
|
||||||
# requests are coming in too quickly
|
|
||||||
# such that there is no known txid
|
|
||||||
# yet established for the ems
|
|
||||||
# dialog's last reqid when the
|
|
||||||
# request handler task is already
|
|
||||||
# receceiving a new update for that
|
|
||||||
# reqid. In this case we simply mark
|
|
||||||
# the reqid as being "too fast" and
|
|
||||||
# then when we get the next txid
|
|
||||||
# update from kraken's backend, and
|
|
||||||
# thus the new txid, we simply
|
|
||||||
# cancel the order for now.
|
|
||||||
|
|
||||||
# TODO: Ideally we eventually
|
|
||||||
# instead make the client side of
|
|
||||||
# the ems block until a submission
|
|
||||||
# is confirmed by the backend
|
|
||||||
# instead of this hacky throttle
|
|
||||||
# style approach and avoid requests
|
|
||||||
# coming in too quickly on the other
|
|
||||||
# side of the ems, aka the client
|
|
||||||
# <-> ems dialog.
|
|
||||||
(toofast := isinstance(
|
|
||||||
reqids2txids.get(reqid),
|
|
||||||
TooFastEdit
|
|
||||||
))
|
|
||||||
|
|
||||||
# pre-existing open order NOT from
|
|
||||||
# this EMS session.
|
|
||||||
or (noid := oid is None)
|
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
if toofast:
|
# TODO: don't even allow this case
|
||||||
# TODO: don't even allow this case
|
# by not moving the client side line
|
||||||
# by not moving the client side line
|
# until an edit confirmation
|
||||||
# until an edit confirmation
|
# arrives...
|
||||||
# arrives...
|
log.cancel(
|
||||||
log.cancel(
|
f'Received too fast edit {txid}:\n'
|
||||||
f'Received too fast edit {txid}:\n'
|
f'{update_msg}\n'
|
||||||
f'{update_msg}\n'
|
'Cancelling order for now!..'
|
||||||
'Cancelling order for now!..'
|
)
|
||||||
)
|
# call ws api to cancel:
|
||||||
# call ws api to cancel:
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
await ws.send_msg({
|
||||||
await ws.send_msg({
|
'event': 'cancelOrder',
|
||||||
'event': 'cancelOrder',
|
'token': token,
|
||||||
'token': token,
|
'reqid': reqid or 0,
|
||||||
'reqid': reqid or 0,
|
'txid': [txid],
|
||||||
'txid': [txid],
|
})
|
||||||
})
|
continue
|
||||||
continue
|
|
||||||
|
|
||||||
# a non-ems-active order, emit live
|
|
||||||
# order embedded in status msg.
|
|
||||||
elif noid:
|
|
||||||
# parse out existing live order
|
|
||||||
descr = rest['descr']
|
|
||||||
fqsn = descr['pair'].replace(
|
|
||||||
'/', '').lower()
|
|
||||||
price = float(descr['price'])
|
|
||||||
size = float(rest['vol'])
|
|
||||||
action = descr['type']
|
|
||||||
|
|
||||||
# register the userref value from
|
|
||||||
# kraken (usually an `int` staring
|
|
||||||
# at 1?) as our reqid.
|
|
||||||
reqids2txids[reqid] = txid
|
|
||||||
oid = str(reqid)
|
|
||||||
ids[oid] = reqid # NOTE!: str -> int
|
|
||||||
|
|
||||||
# fill out ``Status`` + boxed ``Order``
|
|
||||||
status_msg = Status(
|
|
||||||
time_ns=time.time_ns(),
|
|
||||||
resp='open',
|
|
||||||
oid=oid,
|
|
||||||
reqid=reqid,
|
|
||||||
|
|
||||||
# embedded order info
|
|
||||||
req=Order(
|
|
||||||
action=action,
|
|
||||||
exec_mode='live',
|
|
||||||
oid=oid,
|
|
||||||
symbol=fqsn,
|
|
||||||
account=acc_name,
|
|
||||||
price=price,
|
|
||||||
size=size,
|
|
||||||
),
|
|
||||||
src='kraken',
|
|
||||||
)
|
|
||||||
apiflows[reqid].maps.append(status_msg)
|
|
||||||
await ems_stream.send(status_msg)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# send BrokerdStatus messages for all
|
# send BrokerdStatus messages for all
|
||||||
# order state updates
|
# order state updates
|
||||||
|
|
Loading…
Reference in New Issue