Compare commits
15 Commits
gitea_feat
...
krakenwsba
| Author | SHA1 | Date |
|---|---|---|
|
|
3088aa630c | |
|
|
48b8607078 | |
|
|
2240066a12 | |
|
|
5100036e10 | |
|
|
78b9d90202 | |
|
|
9300b3d6db | |
|
|
6d13c8255f | |
|
|
3765c61f2d | |
|
|
cb7a9b9449 | |
|
|
f1192dff09 | |
|
|
9e8d32cdff | |
|
|
c74741228f | |
|
|
f38eef2bf4 | |
|
|
e757e1f277 | |
|
|
4823f87422 |
|
|
@ -0,0 +1,64 @@
|
|||
``kraken`` backend
|
||||
------------------
|
||||
though they don't have the most liquidity of all the cexes they sure are
|
||||
accommodating to those of us who appreciate a little ``xmr``.
|
||||
|
||||
status
|
||||
******
|
||||
current support is *production grade* and both real-time data and order
|
||||
management should be correct and fast. this backend is used by core devs
|
||||
for live trading.
|
||||
|
||||
|
||||
config
|
||||
******
|
||||
In order to get order mode support your ``brokers.toml``
|
||||
needs to have something like the following:
|
||||
|
||||
.. code:: toml
|
||||
|
||||
[kraken]
|
||||
accounts.spot = 'spot'
|
||||
key_descr = "spot"
|
||||
api_key = "69696969696969696696969696969696969696969696969696969696"
|
||||
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
|
||||
|
||||
|
||||
If everything works correctly you should see any current positions
|
||||
loaded in the pps pane on chart load and you should also be able to
|
||||
check your trade records in the file::
|
||||
|
||||
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
|
||||
|
||||
|
||||
An example ledger file will have entries written verbatim from the
|
||||
trade events schema:
|
||||
|
||||
.. code:: toml
|
||||
|
||||
[TFJBKK-SMBZS-VJ4UWS]
|
||||
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
|
||||
postxid = "SMBZSE-M7IF5-CFI7LT"
|
||||
pair = "XXMRZEUR"
|
||||
time = 1655691993.4133966
|
||||
type = "buy"
|
||||
ordertype = "limit"
|
||||
price = "103.97000000"
|
||||
cost = "499.99999977"
|
||||
fee = "0.80000000"
|
||||
vol = "4.80907954"
|
||||
margin = "0.00000000"
|
||||
misc = ""
|
||||
|
||||
|
||||
your ``pps.toml`` file will have position entries like,
|
||||
|
||||
.. code:: toml
|
||||
|
||||
[kraken.spot."xmreur.kraken"]
|
||||
size = 4.80907954
|
||||
be_price = 103.97000000
|
||||
bsuid = "XXMRZEUR"
|
||||
clears = [
|
||||
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
|
||||
]
|
||||
|
|
@ -282,6 +282,7 @@ class Client:
|
|||
"volume": str(size),
|
||||
}
|
||||
return await self.endpoint('AddOrder', data)
|
||||
|
||||
else:
|
||||
# Edit order data for kraken api
|
||||
data["txid"] = reqid
|
||||
|
|
|
|||
|
|
@ -18,21 +18,24 @@
|
|||
Order api and machinery
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
contextmanager as cm,
|
||||
)
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from itertools import chain, count
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
# Callable,
|
||||
# Optional,
|
||||
# Union,
|
||||
Union,
|
||||
)
|
||||
|
||||
from async_generator import aclosing
|
||||
from bidict import bidict
|
||||
import pendulum
|
||||
from pydantic import BaseModel
|
||||
# from pydantic import BaseModel
|
||||
import trio
|
||||
import tractor
|
||||
import wsproto
|
||||
|
|
@ -61,180 +64,138 @@ from .feed import (
|
|||
stream_messages,
|
||||
)
|
||||
|
||||
|
||||
class Trade(BaseModel):
|
||||
'''
|
||||
Trade class that helps parse and validate ownTrades stream
|
||||
|
||||
'''
|
||||
reqid: str # kraken order transaction id
|
||||
action: str # buy or sell
|
||||
price: float # price of asset
|
||||
size: float # vol of asset
|
||||
broker_time: str # e.g GTC, GTD
|
||||
MsgUnion = Union[
|
||||
BrokerdCancel,
|
||||
BrokerdError,
|
||||
BrokerdFill,
|
||||
BrokerdOrder,
|
||||
BrokerdOrderAck,
|
||||
BrokerdPosition,
|
||||
BrokerdStatus,
|
||||
]
|
||||
|
||||
|
||||
async def handle_order_requests(
|
||||
|
||||
ws: NoBsWs,
|
||||
client: Client,
|
||||
ems_order_stream: tractor.MsgStream,
|
||||
token: str,
|
||||
emsflow: dict[str, list[MsgUnion]],
|
||||
ids: bidict[str, int],
|
||||
reqids2txids: dict[int, str],
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Process new order submission requests from the EMS
|
||||
and deliver acks or errors.
|
||||
|
||||
request_msg: dict
|
||||
'''
|
||||
# XXX: UGH, let's unify this.. with ``msgspec``.
|
||||
msg: dict[str, Any]
|
||||
order: BrokerdOrder
|
||||
counter = count()
|
||||
|
||||
async for request_msg in ems_order_stream:
|
||||
log.info(
|
||||
'Received order request:\n'
|
||||
f'{pformat(request_msg)}'
|
||||
)
|
||||
async for msg in ems_order_stream:
|
||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||
match msg:
|
||||
case {
|
||||
'action': 'cancel',
|
||||
}:
|
||||
cancel = BrokerdCancel(**msg)
|
||||
last = emsflow[cancel.oid]
|
||||
reqid = ids[cancel.oid]
|
||||
txid = reqids2txids[reqid]
|
||||
|
||||
action = request_msg['action']
|
||||
# call ws api to cancel:
|
||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||
await ws.send_msg({
|
||||
'event': 'cancelOrder',
|
||||
'token': token,
|
||||
'reqid': reqid,
|
||||
'txid': [txid], # should be txid from submission
|
||||
})
|
||||
|
||||
if action in {'buy', 'sell'}:
|
||||
case {
|
||||
'account': 'kraken.spot' as account,
|
||||
'action': action,
|
||||
} if action in {'buy', 'sell'}:
|
||||
|
||||
account = request_msg['account']
|
||||
if account != 'kraken.spot':
|
||||
log.error(
|
||||
'This is a kraken account, \
|
||||
only a `kraken.spot` selection is valid'
|
||||
)
|
||||
await ems_order_stream.send(BrokerdError(
|
||||
oid=request_msg['oid'],
|
||||
symbol=request_msg['symbol'],
|
||||
# validate
|
||||
order = BrokerdOrder(**msg)
|
||||
|
||||
# reason=f'Kraken only, No account found: `{account}` ?',
|
||||
reason=(
|
||||
'Kraken only, order mode disabled due to '
|
||||
'https://github.com/pikers/piker/issues/299'
|
||||
),
|
||||
# logic from old `Client.submit_limit()`
|
||||
if order.oid in ids:
|
||||
ep = 'editOrder'
|
||||
reqid = ids[order.oid] # integer not txid
|
||||
txid = reqids2txids[reqid]
|
||||
extra = {
|
||||
'orderid': txid, # txid
|
||||
}
|
||||
|
||||
).dict())
|
||||
continue
|
||||
|
||||
# validate
|
||||
order = BrokerdOrder(**request_msg)
|
||||
# call our client api to submit the order
|
||||
resp = await client.submit_limit(
|
||||
symbol=order.symbol,
|
||||
price=order.price,
|
||||
action=order.action,
|
||||
size=order.size,
|
||||
reqid=order.reqid,
|
||||
)
|
||||
|
||||
err = resp['error']
|
||||
if err:
|
||||
oid = order.oid
|
||||
log.error(f'Failed to submit order: {oid}')
|
||||
|
||||
await ems_order_stream.send(
|
||||
BrokerdError(
|
||||
oid=order.oid,
|
||||
reqid=order.reqid,
|
||||
symbol=order.symbol,
|
||||
reason="Failed order submission",
|
||||
broker_details=resp
|
||||
).dict()
|
||||
)
|
||||
else:
|
||||
# TODO: handle multiple orders (cancels?)
|
||||
# txid is an array of strings
|
||||
if order.reqid is None:
|
||||
reqid = resp['result']['txid'][0]
|
||||
else:
|
||||
# update the internal pairing of oid to krakens
|
||||
# txid with the new txid that is returned on edit
|
||||
reqid = resp['result']['txid']
|
||||
ep = 'addOrder'
|
||||
reqid = next(counter)
|
||||
ids[order.oid] = reqid
|
||||
log.debug(
|
||||
f"Adding order {reqid}\n"
|
||||
f'{ids}'
|
||||
)
|
||||
extra = {
|
||||
'ordertype': 'limit',
|
||||
'type': order.action,
|
||||
}
|
||||
|
||||
# deliver ack that order has been submitted to broker routing
|
||||
await ems_order_stream.send(
|
||||
BrokerdOrderAck(
|
||||
psym = order.symbol.upper()
|
||||
pair = f'{psym[:3]}/{psym[3:]}'
|
||||
|
||||
# ems order request id
|
||||
oid=order.oid,
|
||||
# call ws api to submit the order:
|
||||
# https://docs.kraken.com/websockets/#message-addOrder
|
||||
req = {
|
||||
'event': ep,
|
||||
'token': token,
|
||||
|
||||
# broker specific request id
|
||||
reqid=reqid,
|
||||
'reqid': reqid, # remapped-to-int uid from ems
|
||||
'pair': pair,
|
||||
'price': str(order.price),
|
||||
'volume': str(order.size),
|
||||
|
||||
# account the made the order
|
||||
account=order.account
|
||||
# only ensures request is valid, nothing more
|
||||
# validate: 'true',
|
||||
|
||||
).dict()
|
||||
} | extra
|
||||
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
||||
await ws.send_msg(req)
|
||||
|
||||
resp = BrokerdOrderAck(
|
||||
oid=order.oid, # ems order request id
|
||||
reqid=reqid, # our custom int mapping
|
||||
account=account, # piker account
|
||||
)
|
||||
await ems_order_stream.send(resp)
|
||||
|
||||
elif action == 'cancel':
|
||||
msg = BrokerdCancel(**request_msg)
|
||||
# placehold for sanity checking in relay loop
|
||||
emsflow.setdefault(order.oid, []).append(order)
|
||||
|
||||
# Send order cancellation to kraken
|
||||
resp = await client.submit_cancel(
|
||||
reqid=msg.reqid
|
||||
)
|
||||
|
||||
# Check to make sure there was no error returned by
|
||||
# the kraken endpoint. Assert one order was cancelled.
|
||||
try:
|
||||
result = resp['result']
|
||||
count = result['count']
|
||||
|
||||
# check for 'error' key if we received no 'result'
|
||||
except KeyError:
|
||||
error = resp.get('error')
|
||||
case _:
|
||||
account = msg.get('account')
|
||||
if account != 'kraken.spot':
|
||||
log.error(
|
||||
'This is a kraken account, \
|
||||
only a `kraken.spot` selection is valid'
|
||||
)
|
||||
|
||||
await ems_order_stream.send(
|
||||
BrokerdError(
|
||||
oid=msg.oid,
|
||||
reqid=msg.reqid,
|
||||
symbol=msg.symbol,
|
||||
reason="Failed order cancel",
|
||||
broker_details=resp
|
||||
oid=msg['oid'],
|
||||
symbol=msg['symbol'],
|
||||
reason=(
|
||||
'Invalid request msg:\n{msg}'
|
||||
),
|
||||
|
||||
).dict()
|
||||
)
|
||||
|
||||
if not error:
|
||||
raise BrokerError(f'Unknown order cancel response: {resp}')
|
||||
|
||||
else:
|
||||
if not count: # no orders were cancelled?
|
||||
|
||||
# XXX: what exactly is this from and why would we care?
|
||||
# there doesn't seem to be any docs here?
|
||||
# https://docs.kraken.com/rest/#operation/cancelOrder
|
||||
|
||||
# Check to make sure the cancellation is NOT pending,
|
||||
# then send the confirmation to the ems order stream
|
||||
pending = result.get('pending')
|
||||
if pending:
|
||||
log.error(f'Order {oid} cancel was not yet successful')
|
||||
|
||||
await ems_order_stream.send(
|
||||
BrokerdError(
|
||||
oid=msg.oid,
|
||||
reqid=msg.reqid,
|
||||
symbol=msg.symbol,
|
||||
# TODO: maybe figure out if pending
|
||||
# cancels will eventually get cancelled
|
||||
reason="Order cancel is still pending?",
|
||||
broker_details=resp
|
||||
).dict()
|
||||
)
|
||||
|
||||
else: # order cancel success case.
|
||||
|
||||
await ems_order_stream.send(
|
||||
BrokerdStatus(
|
||||
reqid=msg.reqid,
|
||||
account=msg.account,
|
||||
time_ns=time.time_ns(),
|
||||
status='cancelled',
|
||||
reason='Order cancelled',
|
||||
broker_details={'name': 'kraken'}
|
||||
).dict()
|
||||
)
|
||||
else:
|
||||
log.error(f'Unknown order command: {request_msg}')
|
||||
|
||||
|
||||
@acm
|
||||
async def subscribe(
|
||||
|
|
@ -310,13 +271,13 @@ async def trades_dialogue(
|
|||
log.info(
|
||||
f'Loaded {len(trades)} trades from account `{acc_name}`'
|
||||
)
|
||||
trans = await update_ledger(acctid, trades)
|
||||
active, closed = pp.update_pps_conf(
|
||||
'kraken',
|
||||
acctid,
|
||||
trade_records=trans,
|
||||
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
||||
)
|
||||
with open_ledger(acctid, trades) as trans:
|
||||
active, closed = pp.update_pps_conf(
|
||||
'kraken',
|
||||
acctid,
|
||||
trade_records=trans,
|
||||
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
||||
)
|
||||
|
||||
position_msgs: list[dict] = []
|
||||
pps: dict[int, pp.Position]
|
||||
|
|
@ -357,135 +318,441 @@ async def trades_dialogue(
|
|||
),
|
||||
) as ws,
|
||||
trio.open_nursery() as n,
|
||||
aclosing(stream_messages(ws)) as stream,
|
||||
):
|
||||
# task local msg dialog tracking
|
||||
emsflow: dict[
|
||||
str,
|
||||
list[MsgUnion],
|
||||
] = {}
|
||||
|
||||
# 2way map for ems ids to kraken int reqids..
|
||||
ids: bidict[str, int] = bidict()
|
||||
reqids2txids: dict[int, str] = {}
|
||||
|
||||
# task for processing inbound requests from ems
|
||||
n.start_soon(handle_order_requests, client, ems_stream)
|
||||
n.start_soon(
|
||||
handle_order_requests,
|
||||
ws,
|
||||
client,
|
||||
ems_stream,
|
||||
token,
|
||||
emsflow,
|
||||
ids,
|
||||
reqids2txids,
|
||||
)
|
||||
|
||||
count: int = 0
|
||||
# enter relay loop
|
||||
await handle_order_updates(
|
||||
ws,
|
||||
stream,
|
||||
ems_stream,
|
||||
emsflow,
|
||||
ids,
|
||||
reqids2txids,
|
||||
trans,
|
||||
acctid,
|
||||
acc_name,
|
||||
token,
|
||||
)
|
||||
|
||||
# process and relay trades events to ems
|
||||
|
||||
async def handle_order_updates(
|
||||
ws: NoBsWs,
|
||||
ws_stream: AsyncIterator,
|
||||
ems_stream: tractor.MsgStream,
|
||||
emsflow: dict[str, list[MsgUnion]],
|
||||
ids: bidict[str, int],
|
||||
reqids2txids: dict[int, str],
|
||||
trans: list[pp.Transaction],
|
||||
acctid: str,
|
||||
acc_name: str,
|
||||
token: str,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Main msg handling loop for all things order management.
|
||||
|
||||
This code is broken out to make the context explicit and state variables
|
||||
defined in the signature clear to the reader.
|
||||
|
||||
'''
|
||||
# transaction records which will be updated
|
||||
# on new trade clearing events (aka order "fills")
|
||||
trans: list[pp.Transaction]
|
||||
|
||||
async for msg in ws_stream:
|
||||
match msg:
|
||||
# process and relay clearing trade events to ems
|
||||
# https://docs.kraken.com/websockets/#message-ownTrades
|
||||
async for msg in stream_messages(ws):
|
||||
match msg:
|
||||
case [
|
||||
trades_msgs,
|
||||
'ownTrades',
|
||||
{'sequence': seq},
|
||||
]:
|
||||
# XXX: do we actually need this orrr?
|
||||
# ensure that we are only processing new trades?
|
||||
assert seq > count
|
||||
count += 1
|
||||
case [
|
||||
trades_msgs,
|
||||
'ownTrades',
|
||||
# won't exist for historical values?
|
||||
# 'userref': reqid,
|
||||
{'sequence': seq},
|
||||
]:
|
||||
# flatten msgs to an {id -> data} table for processing
|
||||
trades = {
|
||||
tid: trade
|
||||
for entry in trades_msgs
|
||||
for (tid, trade) in entry.items()
|
||||
|
||||
# flatten msgs for processing
|
||||
trades = {
|
||||
tid: trade
|
||||
for entry in trades_msgs
|
||||
for (tid, trade) in entry.items()
|
||||
# only emit entries which are already not-in-ledger
|
||||
if tid not in {r.tid for r in trans}
|
||||
}
|
||||
for tid, trade in trades.items():
|
||||
|
||||
# only emit entries which are already not-in-ledger
|
||||
if tid not in {r.tid for r in trans}
|
||||
}
|
||||
for tid, trade in trades.items():
|
||||
# NOTE: try to get the requid sent in the order
|
||||
# request message if posssible; it may not be
|
||||
# provided since this sub also returns generic
|
||||
# historical trade events.
|
||||
reqid = trade.get('userref', trade['ordertxid'])
|
||||
|
||||
# parse-cast
|
||||
reqid = trade['ordertxid']
|
||||
action = trade['type']
|
||||
price = float(trade['price'])
|
||||
size = float(trade['vol'])
|
||||
broker_time = float(trade['time'])
|
||||
action = trade['type']
|
||||
price = float(trade['price'])
|
||||
size = float(trade['vol'])
|
||||
broker_time = float(trade['time'])
|
||||
|
||||
# send a fill msg for gui update
|
||||
fill_msg = BrokerdFill(
|
||||
reqid=reqid,
|
||||
|
||||
time_ns=time.time_ns(),
|
||||
|
||||
action=action,
|
||||
size=size,
|
||||
price=price,
|
||||
# TODO: maybe capture more msg data
|
||||
# i.e fees?
|
||||
broker_details={'name': 'kraken'},
|
||||
broker_time=broker_time
|
||||
)
|
||||
await ems_stream.send(fill_msg.dict())
|
||||
|
||||
filled_msg = BrokerdStatus(
|
||||
reqid=reqid,
|
||||
time_ns=time.time_ns(),
|
||||
|
||||
account=acc_name,
|
||||
status='filled',
|
||||
filled=size,
|
||||
reason='Order filled by kraken',
|
||||
broker_details={
|
||||
'name': 'kraken',
|
||||
'broker_time': broker_time
|
||||
},
|
||||
|
||||
# TODO: figure out if kraken gives a count
|
||||
# of how many units of underlying were
|
||||
# filled. Alternatively we can decrement
|
||||
# this value ourselves by associating and
|
||||
# calcing from the diff with the original
|
||||
# client-side request, see:
|
||||
# https://github.com/pikers/piker/issues/296
|
||||
remaining=0,
|
||||
)
|
||||
await ems_stream.send(filled_msg.dict())
|
||||
|
||||
# update ledger and position tracking
|
||||
with open_ledger(acctid, trades) as trans:
|
||||
active, closed = pp.update_pps_conf(
|
||||
'kraken',
|
||||
acctid,
|
||||
trade_records=trans,
|
||||
ledger_reload={}.fromkeys(
|
||||
t.bsuid for t in trans),
|
||||
)
|
||||
|
||||
# emit any new pp msgs to ems
|
||||
for pos in filter(
|
||||
bool,
|
||||
chain(active.values(), closed.values()),
|
||||
):
|
||||
pp_msg = BrokerdPosition(
|
||||
broker='kraken',
|
||||
|
||||
# XXX: ok so this is annoying, we're
|
||||
# relaying an account name with the
|
||||
# backend suffix prefixed but when
|
||||
# reading accounts from ledgers we
|
||||
# don't need it and/or it's prefixed
|
||||
# in the section table.. we should
|
||||
# just strip this from the message
|
||||
# right since `.broker` is already
|
||||
# included?
|
||||
account=f'kraken.{acctid}',
|
||||
symbol=pos.symbol.front_fqsn(),
|
||||
size=pos.size,
|
||||
avg_price=pos.be_price,
|
||||
|
||||
# TODO
|
||||
# currency=''
|
||||
)
|
||||
await ems_stream.send(pp_msg.dict())
|
||||
|
||||
# process and relay order state change events
|
||||
# https://docs.kraken.com/websockets/#message-openOrders
|
||||
case [
|
||||
order_msgs,
|
||||
'openOrders',
|
||||
{'sequence': seq},
|
||||
]:
|
||||
for order_msg in order_msgs:
|
||||
log.info(
|
||||
f'Order msg update_{seq}:\n'
|
||||
f'{pformat(order_msg)}'
|
||||
)
|
||||
txid, update_msg = list(order_msg.items())[0]
|
||||
match update_msg:
|
||||
|
||||
# we ignore internal order updates triggered by
|
||||
# kraken's "edit" endpoint.
|
||||
case {
|
||||
'cancel_reason': 'Order replaced',
|
||||
'status': status,
|
||||
'userref': reqid,
|
||||
**rest,
|
||||
}:
|
||||
continue
|
||||
|
||||
case {
|
||||
'status': status,
|
||||
'userref': reqid,
|
||||
**rest,
|
||||
|
||||
# XXX: eg. of remaining msg schema:
|
||||
# 'avg_price': _,
|
||||
# 'cost': _,
|
||||
# 'descr': {
|
||||
# 'close': None,
|
||||
# 'leverage': None,
|
||||
# 'order': descr,
|
||||
# 'ordertype': 'limit',
|
||||
# 'pair': 'XMR/EUR',
|
||||
# 'price': '74.94000000',
|
||||
# 'price2': '0.00000000',
|
||||
# 'type': 'buy'
|
||||
# },
|
||||
# 'expiretm': None,
|
||||
# 'fee': '0.00000000',
|
||||
# 'limitprice': '0.00000000',
|
||||
# 'misc': '',
|
||||
# 'oflags': 'fciq',
|
||||
# 'opentm': '1656966131.337344',
|
||||
# 'refid': None,
|
||||
# 'starttm': None,
|
||||
# 'stopprice': '0.00000000',
|
||||
# 'timeinforce': 'GTC',
|
||||
# 'vol': submit_vlm, # '13.34400854',
|
||||
# 'vol_exec': exec_vlm, # 0.0000
|
||||
}:
|
||||
ems_status = {
|
||||
'open': 'submitted',
|
||||
'closed': 'cancelled',
|
||||
'canceled': 'cancelled',
|
||||
# do we even need to forward
|
||||
# this state to the ems?
|
||||
'pending': 'pending',
|
||||
}[status]
|
||||
|
||||
submit_vlm = rest.get('vol', 0)
|
||||
exec_vlm = rest.get('vol_exec', 0)
|
||||
|
||||
reqids2txids[reqid] = txid
|
||||
|
||||
oid = ids.inverse.get(reqid)
|
||||
if not oid:
|
||||
# TODO: handle these and relay them
|
||||
# through the EMS to the client / UI
|
||||
# side!
|
||||
log.warning(
|
||||
f'Received active order {txid}:\n'
|
||||
f'{update_msg}\n'
|
||||
'Cancelling order for now!..'
|
||||
)
|
||||
|
||||
# call ws api to cancel:
|
||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||
await ws.send_msg({
|
||||
'event': 'cancelOrder',
|
||||
'token': token,
|
||||
'reqid': reqid,
|
||||
'txid': [txid],
|
||||
})
|
||||
continue
|
||||
|
||||
msgs = emsflow[oid]
|
||||
|
||||
# send BrokerdStatus messages for all
|
||||
# order state updates
|
||||
resp = BrokerdStatus(
|
||||
|
||||
# send a fill msg for gui update
|
||||
fill_msg = BrokerdFill(
|
||||
reqid=reqid,
|
||||
time_ns=time.time_ns(),
|
||||
|
||||
action=action,
|
||||
size=size,
|
||||
price=price,
|
||||
# TODO: maybe capture more msg data
|
||||
# i.e fees?
|
||||
broker_details={'name': 'kraken'},
|
||||
broker_time=broker_time
|
||||
)
|
||||
await ems_stream.send(fill_msg.dict())
|
||||
|
||||
filled_msg = BrokerdStatus(
|
||||
reqid=reqid,
|
||||
time_ns=time.time_ns(),
|
||||
|
||||
account=acc_name,
|
||||
status='filled',
|
||||
filled=size,
|
||||
reason='Order filled by kraken',
|
||||
broker_details={
|
||||
'name': 'kraken',
|
||||
'broker_time': broker_time
|
||||
},
|
||||
|
||||
# TODO: figure out if kraken gives a count
|
||||
# of how many units of underlying were
|
||||
# filled. Alternatively we can decrement
|
||||
# this value ourselves by associating and
|
||||
# calcing from the diff with the original
|
||||
# client-side request, see:
|
||||
# https://github.com/pikers/piker/issues/296
|
||||
remaining=0,
|
||||
)
|
||||
await ems_stream.send(filled_msg.dict())
|
||||
|
||||
# update ledger and position tracking
|
||||
trans = await update_ledger(acctid, trades)
|
||||
active, closed = pp.update_pps_conf(
|
||||
'kraken',
|
||||
acctid,
|
||||
trade_records=trans,
|
||||
ledger_reload={}.fromkeys(
|
||||
t.bsuid for t in trans),
|
||||
)
|
||||
|
||||
# emit pp msgs
|
||||
for pos in filter(
|
||||
bool,
|
||||
chain(active.values(), closed.values()),
|
||||
):
|
||||
pp_msg = BrokerdPosition(
|
||||
broker='kraken',
|
||||
|
||||
# XXX: ok so this is annoying, we're
|
||||
# relaying an account name with the
|
||||
# backend suffix prefixed but when
|
||||
# reading accounts from ledgers we
|
||||
# don't need it and/or it's prefixed
|
||||
# in the section table.. we should
|
||||
# just strip this from the message
|
||||
# right since `.broker` is already
|
||||
# included?
|
||||
time_ns=time.time_ns(), # cuz why not
|
||||
account=f'kraken.{acctid}',
|
||||
symbol=pos.symbol.front_fqsn(),
|
||||
size=pos.size,
|
||||
avg_price=pos.be_price,
|
||||
|
||||
# TODO
|
||||
# currency=''
|
||||
# everyone doin camel case..
|
||||
status=ems_status, # force lower case
|
||||
|
||||
filled=exec_vlm,
|
||||
reason='', # why held?
|
||||
remaining=(
|
||||
float(submit_vlm)
|
||||
-
|
||||
float(exec_vlm)
|
||||
),
|
||||
|
||||
broker_details=dict(
|
||||
{'name': 'kraken'}, **update_msg
|
||||
),
|
||||
)
|
||||
await ems_stream.send(pp_msg.dict())
|
||||
msgs.append(resp)
|
||||
await ems_stream.send(resp.dict())
|
||||
|
||||
case [
|
||||
trades_msgs,
|
||||
'openOrders',
|
||||
{'sequence': seq},
|
||||
]:
|
||||
# TODO: async order update handling which we
|
||||
# should remove from `handle_order_requests()`
|
||||
# above:
|
||||
# https://github.com/pikers/piker/issues/293
|
||||
# https://github.com/pikers/piker/issues/310
|
||||
log.info(f'Order update {seq}:{trades_msgs}')
|
||||
case _:
|
||||
log.warning(
|
||||
'Unknown orders msg:\n'
|
||||
f'{txid}:{order_msg}'
|
||||
)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled trades msg: {msg}')
|
||||
await tractor.breakpoint()
|
||||
case {
|
||||
'event': etype,
|
||||
'status': status,
|
||||
'reqid': reqid,
|
||||
**rest,
|
||||
} as event if (
|
||||
etype in {
|
||||
'addOrderStatus',
|
||||
'editOrderStatus',
|
||||
'cancelOrderStatus',
|
||||
}
|
||||
):
|
||||
oid = ids.inverse.get(reqid)
|
||||
if not oid:
|
||||
log.warning(
|
||||
'Unknown order status update?:\n'
|
||||
f'{event}'
|
||||
)
|
||||
continue
|
||||
|
||||
txid = rest.get('txid')
|
||||
if txid:
|
||||
reqids2txids[reqid] = txid
|
||||
|
||||
msgs = emsflow[oid]
|
||||
last = msgs[-1]
|
||||
resps, errored = process_status(
|
||||
event,
|
||||
oid,
|
||||
token,
|
||||
msgs,
|
||||
last,
|
||||
)
|
||||
if resps:
|
||||
msgs.extend(resps)
|
||||
for resp in resps:
|
||||
await ems_stream.send(resp)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled trades update msg: {msg}')
|
||||
|
||||
|
||||
def process_status(
|
||||
event: dict[str, str],
|
||||
oid: str,
|
||||
token: str,
|
||||
msgs: list[MsgUnion],
|
||||
last: MsgUnion,
|
||||
|
||||
) -> tuple[list[MsgUnion], bool]:
|
||||
'''
|
||||
Process `'[add/edit/cancel]OrderStatus'` events by translating to
|
||||
and returning the equivalent EMS-msg responses.
|
||||
|
||||
'''
|
||||
match event:
|
||||
case {
|
||||
'event': etype,
|
||||
'status': 'error',
|
||||
'reqid': reqid,
|
||||
'errorMessage': errmsg,
|
||||
}:
|
||||
# any of ``{'add', 'edit', 'cancel'}``
|
||||
action = etype.rstrip('OrderStatus')
|
||||
log.error(
|
||||
f'Failed to {action} order {reqid}:\n'
|
||||
f'{errmsg}'
|
||||
)
|
||||
resp = BrokerdError(
|
||||
oid=oid,
|
||||
# XXX: use old reqid in case it changed?
|
||||
reqid=reqid,
|
||||
symbol=getattr(last, 'symbol', 'N/A'),
|
||||
|
||||
reason=f'Failed {action}:\n{errmsg}',
|
||||
broker_details=event
|
||||
)
|
||||
return [resp], True
|
||||
|
||||
# successful request cases
|
||||
case {
|
||||
'event': 'addOrderStatus',
|
||||
'status': "ok",
|
||||
'reqid': reqid, # oid from ems side
|
||||
'txid': txid,
|
||||
'descr': descr, # only on success?
|
||||
}:
|
||||
log.info(
|
||||
f'Submitting order: {descr}\n'
|
||||
f'ems oid: {oid}\n'
|
||||
f're-mapped reqid: {reqid}\n'
|
||||
f'txid: {txid}\n'
|
||||
)
|
||||
return [], False
|
||||
|
||||
case {
|
||||
'event': 'editOrderStatus',
|
||||
'status': "ok",
|
||||
'reqid': reqid, # oid from ems side
|
||||
'descr': descr,
|
||||
|
||||
# NOTE: for edit request this is a new value
|
||||
'txid': txid,
|
||||
'originaltxid': origtxid,
|
||||
}:
|
||||
log.info(
|
||||
f'Editting order {oid}[requid={reqid}]:\n'
|
||||
f'txid: {origtxid} -> {txid}\n'
|
||||
f'{descr}'
|
||||
)
|
||||
# deliver another ack to update the ems-side `.reqid`.
|
||||
return [], False
|
||||
|
||||
case {
|
||||
"event": "cancelOrderStatus",
|
||||
"status": "ok",
|
||||
'reqid': reqid,
|
||||
|
||||
# XXX: sometimes this isn't provided!?
|
||||
# 'txid': txids,
|
||||
**rest,
|
||||
}:
|
||||
# TODO: should we support "batch" acking of
|
||||
# multiple cancels thus avoiding the below loop?
|
||||
resps: list[MsgUnion] = []
|
||||
for txid in rest.get('txid', [last.reqid]):
|
||||
resp = BrokerdStatus(
|
||||
reqid=reqid,
|
||||
account=last.account,
|
||||
time_ns=time.time_ns(),
|
||||
status='cancelled',
|
||||
reason='Cancel success: {oid}@{txid}',
|
||||
broker_details=event,
|
||||
)
|
||||
resps.append(resp)
|
||||
|
||||
return resps, False
|
||||
|
||||
|
||||
def norm_trade_records(
|
||||
|
|
@ -494,10 +761,9 @@ def norm_trade_records(
|
|||
) -> list[pp.Transaction]:
|
||||
|
||||
records: list[pp.Transaction] = []
|
||||
|
||||
for tid, record in ledger.items():
|
||||
|
||||
size = record.get('vol') * {
|
||||
size = float(record.get('vol')) * {
|
||||
'buy': 1,
|
||||
'sell': -1,
|
||||
}[record['type']]
|
||||
|
|
@ -508,7 +774,7 @@ def norm_trade_records(
|
|||
pp.Transaction(
|
||||
fqsn=f'{norm_sym}.kraken',
|
||||
tid=tid,
|
||||
size=float(size),
|
||||
size=size,
|
||||
price=float(record['price']),
|
||||
cost=float(record['fee']),
|
||||
dt=pendulum.from_timestamp(float(record['time'])),
|
||||
|
|
@ -522,19 +788,24 @@ def norm_trade_records(
|
|||
return records
|
||||
|
||||
|
||||
async def update_ledger(
|
||||
@cm
|
||||
def open_ledger(
|
||||
acctid: str,
|
||||
trade_entries: list[dict[str, Any]],
|
||||
|
||||
) -> list[pp.Transaction]:
|
||||
'''
|
||||
Write recent session's trades to the user's (local) ledger file.
|
||||
|
||||
# write recent session's trades to the user's (local) ledger file.
|
||||
'''
|
||||
with pp.open_trade_ledger(
|
||||
'kraken',
|
||||
acctid,
|
||||
) as ledger:
|
||||
ledger.update(trade_entries)
|
||||
|
||||
# normalize to transaction form
|
||||
records = norm_trade_records(trade_entries)
|
||||
return records
|
||||
# normalize to transaction form
|
||||
records = norm_trade_records(trade_entries)
|
||||
yield records
|
||||
|
||||
# update on exit
|
||||
ledger.update(trade_entries)
|
||||
|
|
|
|||
|
|
@ -117,9 +117,8 @@ async def stream_messages(
|
|||
too_slow_count = 0
|
||||
continue
|
||||
|
||||
if isinstance(msg, dict):
|
||||
if msg.get('event') == 'heartbeat':
|
||||
|
||||
match msg:
|
||||
case {'event': 'heartbeat'}:
|
||||
now = time.time()
|
||||
delay = now - last_hb
|
||||
last_hb = now
|
||||
|
|
@ -130,11 +129,20 @@ async def stream_messages(
|
|||
|
||||
continue
|
||||
|
||||
err = msg.get('errorMessage')
|
||||
if err:
|
||||
raise BrokerError(err)
|
||||
else:
|
||||
yield msg
|
||||
case {
|
||||
'connectionID': _,
|
||||
'event': 'systemStatus',
|
||||
'status': 'online',
|
||||
'version': _,
|
||||
} as msg:
|
||||
log.info(
|
||||
'WS connection is up:\n'
|
||||
f'{msg}'
|
||||
)
|
||||
continue
|
||||
|
||||
case _:
|
||||
yield msg
|
||||
|
||||
|
||||
async def process_data_feed_msgs(
|
||||
|
|
@ -145,37 +153,60 @@ async def process_data_feed_msgs(
|
|||
|
||||
'''
|
||||
async for msg in stream_messages(ws):
|
||||
match msg:
|
||||
case {
|
||||
'errorMessage': errmsg
|
||||
}:
|
||||
raise BrokerError(errmsg)
|
||||
|
||||
chan_id, *payload_array, chan_name, pair = msg
|
||||
case {
|
||||
'event': 'subscriptionStatus',
|
||||
} as sub:
|
||||
log.info(
|
||||
'WS subscription is active:\n'
|
||||
f'{sub}'
|
||||
)
|
||||
continue
|
||||
|
||||
if 'ohlc' in chan_name:
|
||||
case [
|
||||
chan_id,
|
||||
*payload_array,
|
||||
chan_name,
|
||||
pair
|
||||
]:
|
||||
if 'ohlc' in chan_name:
|
||||
yield 'ohlc', OHLC(
|
||||
chan_id,
|
||||
chan_name,
|
||||
pair,
|
||||
*payload_array[0]
|
||||
)
|
||||
|
||||
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
|
||||
elif 'spread' in chan_name:
|
||||
|
||||
elif 'spread' in chan_name:
|
||||
bid, ask, ts, bsize, asize = map(
|
||||
float, payload_array[0])
|
||||
|
||||
bid, ask, ts, bsize, asize = map(float, payload_array[0])
|
||||
# TODO: really makes you think IB has a horrible API...
|
||||
quote = {
|
||||
'symbol': pair.replace('/', ''),
|
||||
'ticks': [
|
||||
{'type': 'bid', 'price': bid, 'size': bsize},
|
||||
{'type': 'bsize', 'price': bid, 'size': bsize},
|
||||
|
||||
# TODO: really makes you think IB has a horrible API...
|
||||
quote = {
|
||||
'symbol': pair.replace('/', ''),
|
||||
'ticks': [
|
||||
{'type': 'bid', 'price': bid, 'size': bsize},
|
||||
{'type': 'bsize', 'price': bid, 'size': bsize},
|
||||
{'type': 'ask', 'price': ask, 'size': asize},
|
||||
{'type': 'asize', 'price': ask, 'size': asize},
|
||||
],
|
||||
}
|
||||
yield 'l1', quote
|
||||
|
||||
{'type': 'ask', 'price': ask, 'size': asize},
|
||||
{'type': 'asize', 'price': ask, 'size': asize},
|
||||
],
|
||||
}
|
||||
yield 'l1', quote
|
||||
# elif 'book' in msg[-2]:
|
||||
# chan_id, *payload_array, chan_name, pair = msg
|
||||
# print(msg)
|
||||
|
||||
# elif 'book' in msg[-2]:
|
||||
# chan_id, *payload_array, chan_name, pair = msg
|
||||
# print(msg)
|
||||
|
||||
else:
|
||||
print(f'UNHANDLED MSG: {msg}')
|
||||
yield msg
|
||||
case _:
|
||||
print(f'UNHANDLED MSG: {msg}')
|
||||
# yield msg
|
||||
|
||||
|
||||
def normalize(
|
||||
|
|
@ -385,7 +416,7 @@ async def stream_quotes(
|
|||
msg_gen = process_data_feed_msgs(ws)
|
||||
|
||||
# TODO: use ``anext()`` when it lands in 3.10!
|
||||
typ, ohlc_last = await msg_gen.__anext__()
|
||||
typ, ohlc_last = await anext(msg_gen)
|
||||
|
||||
topic, quote = normalize(ohlc_last)
|
||||
|
||||
|
|
|
|||
|
|
@ -88,7 +88,8 @@ def mk_check(
|
|||
|
||||
@dataclass
|
||||
class _DarkBook:
|
||||
'''EMS-trigger execution book.
|
||||
'''
|
||||
EMS-trigger execution book.
|
||||
|
||||
Contains conditions for executions (aka "orders" or "triggers")
|
||||
which are not exposed to brokers and thus the market; i.e. these are
|
||||
|
|
@ -652,6 +653,13 @@ async def translate_and_relay_brokerd_events(
|
|||
else:
|
||||
# check for existing live flow entry
|
||||
entry = book._ems_entries.get(oid)
|
||||
old_reqid = entry.reqid
|
||||
|
||||
if old_reqid and old_reqid != reqid:
|
||||
log.warning(
|
||||
f'Brokerd order id change for {oid}:\n'
|
||||
f'{old_reqid} -> {reqid}'
|
||||
)
|
||||
|
||||
# initial response to brokerd order request
|
||||
if name == 'ack':
|
||||
|
|
@ -662,6 +670,10 @@ async def translate_and_relay_brokerd_events(
|
|||
# a ``BrokerdOrderAck`` **must** be sent after an order
|
||||
# request in order to establish this id mapping.
|
||||
book._ems2brokerd_ids[oid] = reqid
|
||||
log.info(
|
||||
'Rx ACK for order\n'
|
||||
f'oid: {oid} -> reqid: {reqid}'
|
||||
)
|
||||
|
||||
# new order which has not yet be registered into the
|
||||
# local ems book, insert it now and handle 2 cases:
|
||||
|
|
|
|||
Loading…
Reference in New Issue