Compare commits
3 Commits
5100036e10
...
3088aa630c
Author | SHA1 | Date |
---|---|---|
|
3088aa630c | |
|
48b8607078 | |
|
2240066a12 |
|
@ -32,6 +32,7 @@ from typing import (
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from async_generator import aclosing
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
import pendulum
|
import pendulum
|
||||||
# from pydantic import BaseModel
|
# from pydantic import BaseModel
|
||||||
|
@ -82,6 +83,7 @@ async def handle_order_requests(
|
||||||
token: str,
|
token: str,
|
||||||
emsflow: dict[str, list[MsgUnion]],
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
|
reqids2txids: dict[int, str],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -97,6 +99,23 @@ async def handle_order_requests(
|
||||||
async for msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
match msg:
|
match msg:
|
||||||
|
case {
|
||||||
|
'action': 'cancel',
|
||||||
|
}:
|
||||||
|
cancel = BrokerdCancel(**msg)
|
||||||
|
last = emsflow[cancel.oid]
|
||||||
|
reqid = ids[cancel.oid]
|
||||||
|
txid = reqids2txids[reqid]
|
||||||
|
|
||||||
|
# call ws api to cancel:
|
||||||
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
await ws.send_msg({
|
||||||
|
'event': 'cancelOrder',
|
||||||
|
'token': token,
|
||||||
|
'reqid': reqid,
|
||||||
|
'txid': [txid], # should be txid from submission
|
||||||
|
})
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'account': 'kraken.spot' as account,
|
'account': 'kraken.spot' as account,
|
||||||
'action': action,
|
'action': action,
|
||||||
|
@ -109,10 +128,9 @@ async def handle_order_requests(
|
||||||
if order.oid in ids:
|
if order.oid in ids:
|
||||||
ep = 'editOrder'
|
ep = 'editOrder'
|
||||||
reqid = ids[order.oid] # integer not txid
|
reqid = ids[order.oid] # integer not txid
|
||||||
last = emsflow[order.oid][-1]
|
txid = reqids2txids[reqid]
|
||||||
assert last.reqid == order.reqid
|
|
||||||
extra = {
|
extra = {
|
||||||
'orderid': last.reqid, # txid
|
'orderid': txid, # txid
|
||||||
}
|
}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -159,23 +177,6 @@ async def handle_order_requests(
|
||||||
# placehold for sanity checking in relay loop
|
# placehold for sanity checking in relay loop
|
||||||
emsflow.setdefault(order.oid, []).append(order)
|
emsflow.setdefault(order.oid, []).append(order)
|
||||||
|
|
||||||
case {
|
|
||||||
'account': 'kraken.spot' as account,
|
|
||||||
'action': 'cancel',
|
|
||||||
}:
|
|
||||||
cancel = BrokerdCancel(**msg)
|
|
||||||
assert cancel.oid in emsflow
|
|
||||||
reqid = ids[cancel.oid]
|
|
||||||
|
|
||||||
# call ws api to cancel:
|
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
|
||||||
await ws.send_msg({
|
|
||||||
'event': 'cancelOrder',
|
|
||||||
'token': token,
|
|
||||||
'reqid': reqid,
|
|
||||||
'txid': [cancel.reqid], # should be txid from submission
|
|
||||||
})
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
account = msg.get('account')
|
account = msg.get('account')
|
||||||
if account != 'kraken.spot':
|
if account != 'kraken.spot':
|
||||||
|
@ -317,6 +318,7 @@ async def trades_dialogue(
|
||||||
),
|
),
|
||||||
) as ws,
|
) as ws,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
|
aclosing(stream_messages(ws)) as stream,
|
||||||
):
|
):
|
||||||
# task local msg dialog tracking
|
# task local msg dialog tracking
|
||||||
emsflow: dict[
|
emsflow: dict[
|
||||||
|
@ -326,6 +328,7 @@ async def trades_dialogue(
|
||||||
|
|
||||||
# 2way map for ems ids to kraken int reqids..
|
# 2way map for ems ids to kraken int reqids..
|
||||||
ids: bidict[str, int] = bidict()
|
ids: bidict[str, int] = bidict()
|
||||||
|
reqids2txids: dict[int, str] = {}
|
||||||
|
|
||||||
# task for processing inbound requests from ems
|
# task for processing inbound requests from ems
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
|
@ -336,14 +339,17 @@ async def trades_dialogue(
|
||||||
token,
|
token,
|
||||||
emsflow,
|
emsflow,
|
||||||
ids,
|
ids,
|
||||||
|
reqids2txids,
|
||||||
)
|
)
|
||||||
|
|
||||||
# enter relay loop
|
# enter relay loop
|
||||||
await handle_order_updates(
|
await handle_order_updates(
|
||||||
ws,
|
ws,
|
||||||
|
stream,
|
||||||
ems_stream,
|
ems_stream,
|
||||||
emsflow,
|
emsflow,
|
||||||
ids,
|
ids,
|
||||||
|
reqids2txids,
|
||||||
trans,
|
trans,
|
||||||
acctid,
|
acctid,
|
||||||
acc_name,
|
acc_name,
|
||||||
|
@ -353,9 +359,11 @@ async def trades_dialogue(
|
||||||
|
|
||||||
async def handle_order_updates(
|
async def handle_order_updates(
|
||||||
ws: NoBsWs,
|
ws: NoBsWs,
|
||||||
|
ws_stream: AsyncIterator,
|
||||||
ems_stream: tractor.MsgStream,
|
ems_stream: tractor.MsgStream,
|
||||||
emsflow: dict[str, list[MsgUnion]],
|
emsflow: dict[str, list[MsgUnion]],
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
|
reqids2txids: dict[int, str],
|
||||||
trans: list[pp.Transaction],
|
trans: list[pp.Transaction],
|
||||||
acctid: str,
|
acctid: str,
|
||||||
acc_name: str,
|
acc_name: str,
|
||||||
|
@ -369,7 +377,11 @@ async def handle_order_updates(
|
||||||
defined in the signature clear to the reader.
|
defined in the signature clear to the reader.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async for msg in stream_messages(ws):
|
# transaction records which will be updated
|
||||||
|
# on new trade clearing events (aka order "fills")
|
||||||
|
trans: list[pp.Transaction]
|
||||||
|
|
||||||
|
async for msg in ws_stream:
|
||||||
match msg:
|
match msg:
|
||||||
# process and relay clearing trade events to ems
|
# process and relay clearing trade events to ems
|
||||||
# https://docs.kraken.com/websockets/#message-ownTrades
|
# https://docs.kraken.com/websockets/#message-ownTrades
|
||||||
|
@ -380,7 +392,7 @@ async def handle_order_updates(
|
||||||
# 'userref': reqid,
|
# 'userref': reqid,
|
||||||
{'sequence': seq},
|
{'sequence': seq},
|
||||||
]:
|
]:
|
||||||
# flatten msgs for processing
|
# flatten msgs to an {id -> data} table for processing
|
||||||
trades = {
|
trades = {
|
||||||
tid: trade
|
tid: trade
|
||||||
for entry in trades_msgs
|
for entry in trades_msgs
|
||||||
|
@ -547,7 +559,29 @@ async def handle_order_updates(
|
||||||
submit_vlm = rest.get('vol', 0)
|
submit_vlm = rest.get('vol', 0)
|
||||||
exec_vlm = rest.get('vol_exec', 0)
|
exec_vlm = rest.get('vol_exec', 0)
|
||||||
|
|
||||||
oid = ids.inverse[reqid]
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
|
oid = ids.inverse.get(reqid)
|
||||||
|
if not oid:
|
||||||
|
# TODO: handle these and relay them
|
||||||
|
# through the EMS to the client / UI
|
||||||
|
# side!
|
||||||
|
log.warning(
|
||||||
|
f'Received active order {txid}:\n'
|
||||||
|
f'{update_msg}\n'
|
||||||
|
'Cancelling order for now!..'
|
||||||
|
)
|
||||||
|
|
||||||
|
# call ws api to cancel:
|
||||||
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
|
await ws.send_msg({
|
||||||
|
'event': 'cancelOrder',
|
||||||
|
'token': token,
|
||||||
|
'reqid': reqid,
|
||||||
|
'txid': [txid],
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
msgs = emsflow[oid]
|
msgs = emsflow[oid]
|
||||||
|
|
||||||
# send BrokerdStatus messages for all
|
# send BrokerdStatus messages for all
|
||||||
|
@ -586,6 +620,7 @@ async def handle_order_updates(
|
||||||
'event': etype,
|
'event': etype,
|
||||||
'status': status,
|
'status': status,
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
|
**rest,
|
||||||
} as event if (
|
} as event if (
|
||||||
etype in {
|
etype in {
|
||||||
'addOrderStatus',
|
'addOrderStatus',
|
||||||
|
@ -593,7 +628,18 @@ async def handle_order_updates(
|
||||||
'cancelOrderStatus',
|
'cancelOrderStatus',
|
||||||
}
|
}
|
||||||
):
|
):
|
||||||
oid = ids.inverse[reqid]
|
oid = ids.inverse.get(reqid)
|
||||||
|
if not oid:
|
||||||
|
log.warning(
|
||||||
|
'Unknown order status update?:\n'
|
||||||
|
f'{event}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
txid = rest.get('txid')
|
||||||
|
if txid:
|
||||||
|
reqids2txids[reqid] = txid
|
||||||
|
|
||||||
msgs = emsflow[oid]
|
msgs = emsflow[oid]
|
||||||
last = msgs[-1]
|
last = msgs[-1]
|
||||||
resps, errored = process_status(
|
resps, errored = process_status(
|
||||||
|
@ -603,19 +649,10 @@ async def handle_order_updates(
|
||||||
msgs,
|
msgs,
|
||||||
last,
|
last,
|
||||||
)
|
)
|
||||||
# if errored:
|
|
||||||
# if we rx any error cancel the order again
|
|
||||||
# await ws.send_msg({
|
|
||||||
# 'event': 'cancelOrder',
|
|
||||||
# 'token': token,
|
|
||||||
# 'reqid': reqid,
|
|
||||||
# 'txid': [last.reqid], # txid from submission
|
|
||||||
# })
|
|
||||||
|
|
||||||
if resps:
|
if resps:
|
||||||
msgs.extend(resps)
|
msgs.extend(resps)
|
||||||
for resp in resps:
|
for resp in resps:
|
||||||
await ems_stream.send(resp.dict())
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled trades update msg: {msg}')
|
log.warning(f'Unhandled trades update msg: {msg}')
|
||||||
|
@ -726,7 +763,7 @@ def norm_trade_records(
|
||||||
records: list[pp.Transaction] = []
|
records: list[pp.Transaction] = []
|
||||||
for tid, record in ledger.items():
|
for tid, record in ledger.items():
|
||||||
|
|
||||||
size = record.get('vol') * {
|
size = float(record.get('vol')) * {
|
||||||
'buy': 1,
|
'buy': 1,
|
||||||
'sell': -1,
|
'sell': -1,
|
||||||
}[record['type']]
|
}[record['type']]
|
||||||
|
@ -737,7 +774,7 @@ def norm_trade_records(
|
||||||
pp.Transaction(
|
pp.Transaction(
|
||||||
fqsn=f'{norm_sym}.kraken',
|
fqsn=f'{norm_sym}.kraken',
|
||||||
tid=tid,
|
tid=tid,
|
||||||
size=float(size),
|
size=size,
|
||||||
price=float(record['price']),
|
price=float(record['price']),
|
||||||
cost=float(record['fee']),
|
cost=float(record['fee']),
|
||||||
dt=pendulum.from_timestamp(float(record['time'])),
|
dt=pendulum.from_timestamp(float(record['time'])),
|
||||||
|
|
Loading…
Reference in New Issue