kraken: use new `OrderDialogs` type, handle `.spot`
Drop the older `dict[str, ChainMap]` prototype we had since the new `OrderDialogs` built-out while adding `binance` order support is more refined and general. Also, handle new and now expect `.spot` venue token in FQMEs since kraken too has futes markets that we'll likely want to support eventually.basic_buy_bot
parent
e4c1003aba
commit
5d930175e4
|
@ -18,7 +18,6 @@
|
||||||
Order api and machinery
|
Order api and machinery
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from collections import ChainMap, defaultdict
|
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
aclosing,
|
aclosing,
|
||||||
|
@ -52,6 +51,9 @@ from piker.accounting import (
|
||||||
from piker.accounting._mktinfo import (
|
from piker.accounting._mktinfo import (
|
||||||
MktPair,
|
MktPair,
|
||||||
)
|
)
|
||||||
|
from piker.clearing import(
|
||||||
|
OrderDialogs,
|
||||||
|
)
|
||||||
from piker.clearing._messages import (
|
from piker.clearing._messages import (
|
||||||
Order,
|
Order,
|
||||||
Status,
|
Status,
|
||||||
|
@ -124,7 +126,7 @@ async def handle_order_requests(
|
||||||
client: Client,
|
client: Client,
|
||||||
ems_order_stream: tractor.MsgStream,
|
ems_order_stream: tractor.MsgStream,
|
||||||
token: str,
|
token: str,
|
||||||
apiflows: dict[int, ChainMap[dict[str, dict]]],
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: dict[int, str],
|
reqids2txids: dict[int, str],
|
||||||
|
|
||||||
|
@ -188,6 +190,7 @@ async def handle_order_requests(
|
||||||
try:
|
try:
|
||||||
txid: str = reqids2txids[reqid]
|
txid: str = reqids2txids[reqid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
log.error('TOO FAST EDIT')
|
log.error('TOO FAST EDIT')
|
||||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
reqids2txids[reqid] = TooFastEdit(reqid)
|
||||||
|
@ -221,7 +224,11 @@ async def handle_order_requests(
|
||||||
'type': order.action,
|
'type': order.action,
|
||||||
}
|
}
|
||||||
|
|
||||||
psym: str = order.symbol.upper()
|
# XXX strip any .<venue> token which should
|
||||||
|
# ONLY ever be '.spot' rn, until we support
|
||||||
|
# futes.
|
||||||
|
bs_fqme: str = order.symbol.rstrip('.spot')
|
||||||
|
psym: str = bs_fqme.upper()
|
||||||
pair: str = f'{psym[:3]}/{psym[3:]}'
|
pair: str = f'{psym[:3]}/{psym[3:]}'
|
||||||
|
|
||||||
# XXX: ACK the request **immediately** before sending
|
# XXX: ACK the request **immediately** before sending
|
||||||
|
@ -260,7 +267,7 @@ async def handle_order_requests(
|
||||||
await ws.send_msg(req)
|
await ws.send_msg(req)
|
||||||
|
|
||||||
# placehold for sanity checking in relay loop
|
# placehold for sanity checking in relay loop
|
||||||
apiflows[reqid].maps.append(msg)
|
apiflows.add_msg(reqid, msg)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
account = msg.get('account')
|
account = msg.get('account')
|
||||||
|
@ -440,10 +447,7 @@ async def open_trade_dialog(
|
||||||
acc_name = 'kraken.' + acctid
|
acc_name = 'kraken.' + acctid
|
||||||
|
|
||||||
# task local msg dialog tracking
|
# task local msg dialog tracking
|
||||||
apiflows: defaultdict[
|
apiflows = OrderDialogs()
|
||||||
int,
|
|
||||||
ChainMap[dict[str, dict]],
|
|
||||||
] = defaultdict(ChainMap)
|
|
||||||
|
|
||||||
# 2way map for ems ids to kraken int reqids..
|
# 2way map for ems ids to kraken int reqids..
|
||||||
ids: bidict[str, int] = bidict()
|
ids: bidict[str, int] = bidict()
|
||||||
|
@ -706,7 +710,7 @@ async def handle_order_updates(
|
||||||
ws: NoBsWs,
|
ws: NoBsWs,
|
||||||
ws_stream: AsyncIterator,
|
ws_stream: AsyncIterator,
|
||||||
ems_stream: tractor.MsgStream,
|
ems_stream: tractor.MsgStream,
|
||||||
apiflows: dict[int, ChainMap[dict[str, dict]]],
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: bidict[int, str],
|
reqids2txids: bidict[int, str],
|
||||||
table: PpTable,
|
table: PpTable,
|
||||||
|
@ -921,7 +925,7 @@ async def handle_order_updates(
|
||||||
),
|
),
|
||||||
src='kraken',
|
src='kraken',
|
||||||
)
|
)
|
||||||
apiflows[reqid].maps.append(status_msg.to_dict())
|
apiflows.add_msg(reqid, status_msg.to_dict())
|
||||||
await ems_stream.send(status_msg)
|
await ems_stream.send(status_msg)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -1057,7 +1061,7 @@ async def handle_order_updates(
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
apiflows[reqid].maps.append(update_msg)
|
apiflows.add_msg(reqid, update_msg)
|
||||||
await ems_stream.send(resp)
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
# fill msg.
|
# fill msg.
|
||||||
|
@ -1136,9 +1140,8 @@ async def handle_order_updates(
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# update the msg chain
|
# update the msg history
|
||||||
chain = apiflows[reqid]
|
apiflows.add_msg(reqid, event)
|
||||||
chain.maps.append(event)
|
|
||||||
|
|
||||||
if status == 'error':
|
if status == 'error':
|
||||||
# any of ``{'add', 'edit', 'cancel'}``
|
# any of ``{'add', 'edit', 'cancel'}``
|
||||||
|
@ -1148,11 +1151,16 @@ async def handle_order_updates(
|
||||||
f'Failed to {action} order {reqid}:\n'
|
f'Failed to {action} order {reqid}:\n'
|
||||||
f'{errmsg}'
|
f'{errmsg}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
symbol: str = 'N/A'
|
||||||
|
if chain := apiflows.get(reqid):
|
||||||
|
symbol: str = chain.get('symbol', 'N/A')
|
||||||
|
|
||||||
await ems_stream.send(BrokerdError(
|
await ems_stream.send(BrokerdError(
|
||||||
oid=oid,
|
oid=oid,
|
||||||
# XXX: use old reqid in case it changed?
|
# XXX: use old reqid in case it changed?
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
symbol=chain.get('symbol', 'N/A'),
|
symbol=symbol,
|
||||||
|
|
||||||
reason=f'Failed {action}:\n{errmsg}',
|
reason=f'Failed {action}:\n{errmsg}',
|
||||||
broker_details=event
|
broker_details=event
|
||||||
|
|
Loading…
Reference in New Issue