Compare commits

..

No commits in common. "3088aa630c12e45f5b8983d6af65c00ff61f8b59" and "5100036e1017038e1a17865511af14a07b6311e7" have entirely different histories.

1 changed files with 36 additions and 73 deletions

View File

@ -32,7 +32,6 @@ from typing import (
Union,
)
from async_generator import aclosing
from bidict import bidict
import pendulum
# from pydantic import BaseModel
@ -83,7 +82,6 @@ async def handle_order_requests(
token: str,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None:
'''
@ -99,23 +97,6 @@ async def handle_order_requests(
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case {
'account': 'kraken.spot' as account,
'action': action,
@ -128,9 +109,10 @@ async def handle_order_requests(
if order.oid in ids:
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
txid = reqids2txids[reqid]
last = emsflow[order.oid][-1]
assert last.reqid == order.reqid
extra = {
'orderid': txid, # txid
'orderid': last.reqid, # txid
}
else:
@ -177,6 +159,23 @@ async def handle_order_requests(
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
case {
'account': 'kraken.spot' as account,
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [cancel.reqid], # should be txid from submission
})
case _:
account = msg.get('account')
if account != 'kraken.spot':
@ -318,7 +317,6 @@ async def trades_dialogue(
),
) as ws,
trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
):
# task local msg dialog tracking
emsflow: dict[
@ -328,7 +326,6 @@ async def trades_dialogue(
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems
n.start_soon(
@ -339,17 +336,14 @@ async def trades_dialogue(
token,
emsflow,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
emsflow,
ids,
reqids2txids,
trans,
acctid,
acc_name,
@ -359,11 +353,9 @@ async def trades_dialogue(
async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
@ -377,11 +369,7 @@ async def handle_order_updates(
defined in the signature clear to the reader.
'''
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: list[pp.Transaction]
async for msg in ws_stream:
async for msg in stream_messages(ws):
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
@ -392,7 +380,7 @@ async def handle_order_updates(
# 'userref': reqid,
{'sequence': seq},
]:
# flatten msgs to an {id -> data} table for processing
# flatten msgs for processing
trades = {
tid: trade
for entry in trades_msgs
@ -559,29 +547,7 @@ async def handle_order_updates(
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
oid = ids.inverse[reqid]
msgs = emsflow[oid]
# send BrokerdStatus messages for all
@ -620,7 +586,6 @@ async def handle_order_updates(
'event': etype,
'status': status,
'reqid': reqid,
**rest,
} as event if (
etype in {
'addOrderStatus',
@ -628,18 +593,7 @@ async def handle_order_updates(
'cancelOrderStatus',
}
):
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resps, errored = process_status(
@ -649,10 +603,19 @@ async def handle_order_updates(
msgs,
last,
)
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
if resps:
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(f'Unhandled trades update msg: {msg}')
@ -763,7 +726,7 @@ def norm_trade_records(
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = float(record.get('vol')) * {
size = record.get('vol') * {
'buy': 1,
'sell': -1,
}[record['type']]
@ -774,7 +737,7 @@ def norm_trade_records(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=size,
size=float(size),
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),