Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 3088aa630c Cancel any live orders found on connect
More or less just to avoid orders the user wasn't aware of from
persisting until we get "open order relaying" through the ems working.

Some further fixes which required a new `reqids2txids` map which keeps
track of which `kraken` "txid" is mapped to our `reqid: int`; mainly
this was needed for cancel requests which require knowing the underlying
`txid`s (since apparently kraken doesn't keep track of the "reqid"  we
pass it). Pass the ws instance into `handle_order_updates()` to enable
the cancelling orders on startup. Don't key error on unknown `reqid`
values (for eg. when receiving historical trade events on startup).
Handle cancel requests first in the ems side loop.
2022-07-09 12:40:12 -04:00
Tyler Goodlet 48b8607078 Use `aclosing()` around ws async gen 2022-07-09 12:40:05 -04:00
Tyler Goodlet 2240066a12 Lol, gotta `float()` that vlm before `*` XD 2022-07-09 12:39:34 -04:00
1 changed files with 73 additions and 36 deletions

View File

@ -32,6 +32,7 @@ from typing import (
Union,
)
from async_generator import aclosing
from bidict import bidict
import pendulum
# from pydantic import BaseModel
@ -82,6 +83,7 @@ async def handle_order_requests(
token: str,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None:
'''
@ -97,6 +99,23 @@ async def handle_order_requests(
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case {
'account': 'kraken.spot' as account,
'action': action,
@ -109,10 +128,9 @@ async def handle_order_requests(
if order.oid in ids:
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
last = emsflow[order.oid][-1]
assert last.reqid == order.reqid
txid = reqids2txids[reqid]
extra = {
'orderid': last.reqid, # txid
'orderid': txid, # txid
}
else:
@ -159,23 +177,6 @@ async def handle_order_requests(
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
case {
'account': 'kraken.spot' as account,
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [cancel.reqid], # should be txid from submission
})
case _:
account = msg.get('account')
if account != 'kraken.spot':
@ -317,6 +318,7 @@ async def trades_dialogue(
),
) as ws,
trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
):
# task local msg dialog tracking
emsflow: dict[
@ -326,6 +328,7 @@ async def trades_dialogue(
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems
n.start_soon(
@ -336,14 +339,17 @@ async def trades_dialogue(
token,
emsflow,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
emsflow,
ids,
reqids2txids,
trans,
acctid,
acc_name,
@ -353,9 +359,11 @@ async def trades_dialogue(
async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
@ -369,7 +377,11 @@ async def handle_order_updates(
defined in the signature clear to the reader.
'''
async for msg in stream_messages(ws):
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: list[pp.Transaction]
async for msg in ws_stream:
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
@ -380,7 +392,7 @@ async def handle_order_updates(
# 'userref': reqid,
{'sequence': seq},
]:
# flatten msgs for processing
# flatten msgs to an {id -> data} table for processing
trades = {
tid: trade
for entry in trades_msgs
@ -547,7 +559,29 @@ async def handle_order_updates(
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
oid = ids.inverse[reqid]
reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid]
# send BrokerdStatus messages for all
@ -586,6 +620,7 @@ async def handle_order_updates(
'event': etype,
'status': status,
'reqid': reqid,
**rest,
} as event if (
etype in {
'addOrderStatus',
@ -593,7 +628,18 @@ async def handle_order_updates(
'cancelOrderStatus',
}
):
oid = ids.inverse[reqid]
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid]
last = msgs[-1]
resps, errored = process_status(
@ -603,19 +649,10 @@ async def handle_order_updates(
msgs,
last,
)
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
if resps:
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp.dict())
await ems_stream.send(resp)
case _:
log.warning(f'Unhandled trades update msg: {msg}')
@ -726,7 +763,7 @@ def norm_trade_records(
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = record.get('vol') * {
size = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
@ -737,7 +774,7 @@ def norm_trade_records(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=float(size),
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),