2022-06-29 15:08:48 +00:00
|
|
|
# piker: trading gear for hackers
|
|
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
2022-06-29 17:25:47 +00:00
|
|
|
|
2022-06-29 15:08:48 +00:00
|
|
|
'''
|
|
|
|
Order api and machinery
|
|
|
|
|
|
|
|
'''
|
2022-07-05 20:39:18 +00:00
|
|
|
from contextlib import (
|
|
|
|
asynccontextmanager as acm,
|
|
|
|
contextmanager as cm,
|
|
|
|
)
|
2022-07-03 18:39:33 +00:00
|
|
|
from functools import partial
|
2022-07-05 02:00:56 +00:00
|
|
|
from itertools import chain, count
|
2022-06-29 15:08:48 +00:00
|
|
|
from pprint import pformat
|
|
|
|
import time
|
|
|
|
from typing import (
|
|
|
|
Any,
|
|
|
|
AsyncIterator,
|
2022-07-05 15:03:32 +00:00
|
|
|
Union,
|
2022-06-29 15:08:48 +00:00
|
|
|
)
|
|
|
|
|
2022-07-08 21:17:28 +00:00
|
|
|
from async_generator import aclosing
|
2022-07-05 02:00:56 +00:00
|
|
|
from bidict import bidict
|
2022-07-02 19:40:59 +00:00
|
|
|
import pendulum
|
2022-06-29 15:08:48 +00:00
|
|
|
import trio
|
|
|
|
import tractor
|
|
|
|
import wsproto
|
|
|
|
|
2022-07-02 19:40:59 +00:00
|
|
|
from piker import pp
|
2022-06-29 17:25:47 +00:00
|
|
|
from piker.clearing._messages import (
|
2022-07-03 15:18:45 +00:00
|
|
|
BrokerdCancel,
|
|
|
|
BrokerdError,
|
2022-06-29 17:25:47 +00:00
|
|
|
BrokerdFill,
|
2022-07-03 15:18:45 +00:00
|
|
|
BrokerdOrder,
|
|
|
|
BrokerdOrderAck,
|
|
|
|
BrokerdPosition,
|
|
|
|
BrokerdStatus,
|
2022-06-29 17:25:47 +00:00
|
|
|
)
|
2022-07-05 20:59:47 +00:00
|
|
|
from . import log
|
2022-06-29 17:25:47 +00:00
|
|
|
from .api import (
|
2022-06-29 15:08:48 +00:00
|
|
|
Client,
|
|
|
|
BrokerError,
|
|
|
|
get_client,
|
2022-06-29 17:48:01 +00:00
|
|
|
)
|
|
|
|
from .feed import (
|
|
|
|
get_console_log,
|
2022-06-29 15:08:48 +00:00
|
|
|
open_autorecon_ws,
|
|
|
|
NoBsWs,
|
|
|
|
stream_messages,
|
|
|
|
)
|
|
|
|
|
2022-07-05 15:03:32 +00:00
|
|
|
MsgUnion = Union[
|
|
|
|
BrokerdCancel,
|
|
|
|
BrokerdError,
|
|
|
|
BrokerdFill,
|
|
|
|
BrokerdOrder,
|
|
|
|
BrokerdOrderAck,
|
|
|
|
BrokerdPosition,
|
|
|
|
BrokerdStatus,
|
|
|
|
]
|
|
|
|
|
2022-06-29 15:08:48 +00:00
|
|
|
|
|
|
|
async def handle_order_requests(
|
|
|
|
|
2022-07-05 02:00:56 +00:00
|
|
|
ws: NoBsWs,
|
2022-06-29 21:24:38 +00:00
|
|
|
client: Client,
|
|
|
|
ems_order_stream: tractor.MsgStream,
|
2022-07-05 02:00:56 +00:00
|
|
|
token: str,
|
2022-07-05 15:03:32 +00:00
|
|
|
emsflow: dict[str, list[MsgUnion]],
|
2022-07-05 02:00:56 +00:00
|
|
|
ids: bidict[str, int],
|
2022-07-09 03:10:25 +00:00
|
|
|
reqids2txids: dict[int, str],
|
2022-06-29 15:08:48 +00:00
|
|
|
|
|
|
|
) -> None:
|
2022-07-05 02:00:56 +00:00
|
|
|
'''
|
|
|
|
Process new order submission requests from the EMS
|
|
|
|
and deliver acks or errors.
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 02:00:56 +00:00
|
|
|
'''
|
|
|
|
# XXX: UGH, let's unify this.. with ``msgspec``.
|
2022-07-05 15:03:32 +00:00
|
|
|
msg: dict[str, Any]
|
2022-06-29 15:08:48 +00:00
|
|
|
order: BrokerdOrder
|
2022-07-05 02:00:56 +00:00
|
|
|
counter = count()
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 15:03:32 +00:00
|
|
|
async for msg in ems_order_stream:
|
|
|
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
|
|
|
match msg:
|
2022-07-09 03:10:25 +00:00
|
|
|
case {
|
|
|
|
'action': 'cancel',
|
|
|
|
}:
|
|
|
|
cancel = BrokerdCancel(**msg)
|
2022-07-10 20:16:23 +00:00
|
|
|
# last = emsflow[cancel.oid]
|
2022-07-09 03:10:25 +00:00
|
|
|
reqid = ids[cancel.oid]
|
|
|
|
txid = reqids2txids[reqid]
|
|
|
|
|
|
|
|
# call ws api to cancel:
|
|
|
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
|
|
|
await ws.send_msg({
|
|
|
|
'event': 'cancelOrder',
|
|
|
|
'token': token,
|
|
|
|
'reqid': reqid,
|
|
|
|
'txid': [txid], # should be txid from submission
|
|
|
|
})
|
|
|
|
|
2022-07-05 15:03:32 +00:00
|
|
|
case {
|
2022-07-08 23:00:13 +00:00
|
|
|
'account': 'kraken.spot' as account,
|
2022-07-05 15:03:32 +00:00
|
|
|
'action': action,
|
|
|
|
} if action in {'buy', 'sell'}:
|
|
|
|
|
|
|
|
# validate
|
|
|
|
order = BrokerdOrder(**msg)
|
|
|
|
|
|
|
|
# logic from old `Client.submit_limit()`
|
|
|
|
if order.oid in ids:
|
|
|
|
ep = 'editOrder'
|
|
|
|
reqid = ids[order.oid] # integer not txid
|
2022-07-09 03:10:25 +00:00
|
|
|
txid = reqids2txids[reqid]
|
2022-07-05 15:03:32 +00:00
|
|
|
extra = {
|
2022-07-09 03:10:25 +00:00
|
|
|
'orderid': txid, # txid
|
2022-07-05 15:03:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
else:
|
|
|
|
ep = 'addOrder'
|
|
|
|
reqid = next(counter)
|
|
|
|
ids[order.oid] = reqid
|
|
|
|
log.debug(
|
2022-07-08 23:00:13 +00:00
|
|
|
f"Adding order {reqid}\n"
|
2022-07-05 15:03:32 +00:00
|
|
|
f'{ids}'
|
|
|
|
)
|
|
|
|
extra = {
|
|
|
|
'ordertype': 'limit',
|
|
|
|
'type': order.action,
|
|
|
|
}
|
|
|
|
|
|
|
|
psym = order.symbol.upper()
|
|
|
|
pair = f'{psym[:3]}/{psym[3:]}'
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
# XXX: ACK the request **immediately** before sending
|
|
|
|
# the api side request to ensure the ems maps the oid ->
|
|
|
|
# reqid correctly!
|
|
|
|
resp = BrokerdOrderAck(
|
|
|
|
oid=order.oid, # ems order request id
|
|
|
|
reqid=reqid, # our custom int mapping
|
|
|
|
account=account, # piker account
|
|
|
|
)
|
|
|
|
await ems_order_stream.send(resp)
|
|
|
|
|
2022-07-05 02:00:56 +00:00
|
|
|
# call ws api to submit the order:
|
|
|
|
# https://docs.kraken.com/websockets/#message-addOrder
|
2022-07-05 15:03:32 +00:00
|
|
|
req = {
|
2022-07-05 02:00:56 +00:00
|
|
|
'event': ep,
|
|
|
|
'token': token,
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 02:00:56 +00:00
|
|
|
'reqid': reqid, # remapped-to-int uid from ems
|
|
|
|
'pair': pair,
|
2022-07-05 15:03:32 +00:00
|
|
|
'price': str(order.price),
|
|
|
|
'volume': str(order.size),
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 02:00:56 +00:00
|
|
|
# only ensures request is valid, nothing more
|
|
|
|
# validate: 'true',
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 15:03:32 +00:00
|
|
|
} | extra
|
|
|
|
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
|
|
|
await ws.send_msg(req)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 15:03:32 +00:00
|
|
|
# placehold for sanity checking in relay loop
|
|
|
|
emsflow.setdefault(order.oid, []).append(order)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 15:03:32 +00:00
|
|
|
case _:
|
|
|
|
account = msg.get('account')
|
|
|
|
if account != 'kraken.spot':
|
|
|
|
log.error(
|
|
|
|
'This is a kraken account, \
|
|
|
|
only a `kraken.spot` selection is valid'
|
|
|
|
)
|
|
|
|
|
|
|
|
await ems_order_stream.send(
|
|
|
|
BrokerdError(
|
|
|
|
oid=msg['oid'],
|
|
|
|
symbol=msg['symbol'],
|
|
|
|
reason=(
|
|
|
|
'Invalid request msg:\n{msg}'
|
|
|
|
),
|
|
|
|
|
2022-07-09 16:59:09 +00:00
|
|
|
)
|
2022-07-05 15:03:32 +00:00
|
|
|
)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
|
|
|
|
2022-07-03 18:39:33 +00:00
|
|
|
@acm
|
|
|
|
async def subscribe(
|
|
|
|
ws: wsproto.WSConnection,
|
|
|
|
token: str,
|
|
|
|
subs: list[str] = ['ownTrades', 'openOrders'],
|
|
|
|
):
|
2022-06-29 15:08:48 +00:00
|
|
|
'''
|
2022-07-03 18:39:33 +00:00
|
|
|
Setup ws api subscriptions:
|
2022-06-29 15:08:48 +00:00
|
|
|
https://docs.kraken.com/websockets/#message-subscribe
|
|
|
|
|
2022-07-03 18:39:33 +00:00
|
|
|
By default we sign up for trade and order update events.
|
|
|
|
|
2022-06-29 15:08:48 +00:00
|
|
|
'''
|
2022-07-03 18:39:33 +00:00
|
|
|
# more specific logic for this in kraken's sync client:
|
2022-06-29 15:08:48 +00:00
|
|
|
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
|
|
|
|
2022-07-03 18:39:33 +00:00
|
|
|
assert token
|
|
|
|
for sub in subs:
|
|
|
|
msg = {
|
|
|
|
'event': 'subscribe',
|
|
|
|
'subscription': {
|
|
|
|
'name': sub,
|
|
|
|
'token': token,
|
|
|
|
}
|
|
|
|
}
|
2022-06-29 15:08:48 +00:00
|
|
|
|
|
|
|
# TODO: we want to eventually allow unsubs which should
|
|
|
|
# be completely fine to request from a separate task
|
|
|
|
# since internally the ws methods appear to be FIFO
|
|
|
|
# locked.
|
2022-07-03 18:39:33 +00:00
|
|
|
await ws.send_msg(msg)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-03 18:39:33 +00:00
|
|
|
yield
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-03 18:39:33 +00:00
|
|
|
for sub in subs:
|
2022-06-29 15:08:48 +00:00
|
|
|
# unsub from all pairs on teardown
|
|
|
|
await ws.send_msg({
|
|
|
|
'event': 'unsubscribe',
|
2022-07-03 18:39:33 +00:00
|
|
|
'subscription': [sub],
|
2022-06-29 15:08:48 +00:00
|
|
|
})
|
|
|
|
|
2022-07-03 18:39:33 +00:00
|
|
|
# XXX: do we need to ack the unsub?
|
|
|
|
# await ws.recv_msg()
|
|
|
|
|
|
|
|
|
|
|
|
@tractor.context
|
|
|
|
async def trades_dialogue(
|
|
|
|
ctx: tractor.Context,
|
|
|
|
loglevel: str = None,
|
|
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
|
|
|
|
|
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
|
|
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
|
|
|
async with get_client() as client:
|
|
|
|
|
2022-07-02 19:40:59 +00:00
|
|
|
# TODO: make ems flip to paper mode via
|
|
|
|
# some returned signal if the user only wants to use
|
|
|
|
# the data feed or we return this?
|
|
|
|
# await ctx.started(({}, ['paper']))
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-02 19:40:59 +00:00
|
|
|
if not client._api_key:
|
|
|
|
raise RuntimeError(
|
|
|
|
'Missing Kraken API key in `brokers.toml`!?!?')
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-02 19:40:59 +00:00
|
|
|
# auth required block
|
|
|
|
acctid = client._name
|
|
|
|
acc_name = 'kraken.' + acctid
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-06-29 21:24:38 +00:00
|
|
|
# pull and deliver trades ledger
|
2022-06-29 15:08:48 +00:00
|
|
|
trades = await client.get_trades()
|
2022-06-29 21:24:38 +00:00
|
|
|
log.info(
|
|
|
|
f'Loaded {len(trades)} trades from account `{acc_name}`'
|
|
|
|
)
|
2022-07-11 00:05:31 +00:00
|
|
|
with open_ledger(
|
|
|
|
acctid,
|
|
|
|
trades,
|
|
|
|
) as trans:
|
2022-07-05 20:39:18 +00:00
|
|
|
active, closed = pp.update_pps_conf(
|
|
|
|
'kraken',
|
|
|
|
acctid,
|
|
|
|
trade_records=trans,
|
|
|
|
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
|
|
|
)
|
2022-07-02 19:40:59 +00:00
|
|
|
|
|
|
|
position_msgs: list[dict] = []
|
|
|
|
pps: dict[int, pp.Position]
|
|
|
|
for pps in [active, closed]:
|
|
|
|
for tid, p in pps.items():
|
|
|
|
msg = BrokerdPosition(
|
|
|
|
broker='kraken',
|
|
|
|
account=acc_name,
|
|
|
|
symbol=p.symbol.front_fqsn(),
|
|
|
|
size=p.size,
|
|
|
|
avg_price=p.be_price,
|
|
|
|
currency='',
|
|
|
|
)
|
2022-07-08 14:55:02 +00:00
|
|
|
position_msgs.append(msg)
|
2022-07-02 19:40:59 +00:00
|
|
|
|
|
|
|
await ctx.started(
|
|
|
|
(position_msgs, [acc_name])
|
|
|
|
)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
|
|
|
# Get websocket token for authenticated data stream
|
|
|
|
# Assert that a token was actually received.
|
|
|
|
resp = await client.endpoint('GetWebSocketsToken', {})
|
|
|
|
|
2022-07-02 19:40:59 +00:00
|
|
|
err = resp.get('error')
|
|
|
|
if err:
|
|
|
|
raise BrokerError(err)
|
|
|
|
|
2022-06-29 15:08:48 +00:00
|
|
|
token = resp['result']['token']
|
|
|
|
|
2022-07-03 15:18:45 +00:00
|
|
|
ws: NoBsWs
|
2022-06-29 15:08:48 +00:00
|
|
|
async with (
|
|
|
|
ctx.open_stream() as ems_stream,
|
2022-06-29 21:24:38 +00:00
|
|
|
open_autorecon_ws(
|
2022-06-29 15:08:48 +00:00
|
|
|
'wss://ws-auth.kraken.com/',
|
2022-07-03 18:39:33 +00:00
|
|
|
fixture=partial(
|
|
|
|
subscribe,
|
|
|
|
token=token,
|
|
|
|
),
|
2022-06-29 21:24:38 +00:00
|
|
|
) as ws,
|
|
|
|
trio.open_nursery() as n,
|
2022-07-08 21:17:28 +00:00
|
|
|
aclosing(stream_messages(ws)) as stream,
|
2022-06-29 21:24:38 +00:00
|
|
|
):
|
2022-07-05 15:03:32 +00:00
|
|
|
# task local msg dialog tracking
|
|
|
|
emsflow: dict[
|
|
|
|
str,
|
|
|
|
list[MsgUnion],
|
|
|
|
] = {}
|
2022-07-05 02:00:56 +00:00
|
|
|
|
|
|
|
# 2way map for ems ids to kraken int reqids..
|
|
|
|
ids: bidict[str, int] = bidict()
|
2022-07-10 20:16:23 +00:00
|
|
|
reqids2txids: bidict[int, str] = bidict()
|
2022-07-05 02:00:56 +00:00
|
|
|
|
2022-06-29 21:24:38 +00:00
|
|
|
# task for processing inbound requests from ems
|
2022-07-05 02:00:56 +00:00
|
|
|
n.start_soon(
|
|
|
|
handle_order_requests,
|
|
|
|
ws,
|
|
|
|
client,
|
|
|
|
ems_stream,
|
|
|
|
token,
|
2022-07-05 15:03:32 +00:00
|
|
|
emsflow,
|
2022-07-05 02:00:56 +00:00
|
|
|
ids,
|
2022-07-09 03:10:25 +00:00
|
|
|
reqids2txids,
|
2022-07-05 02:00:56 +00:00
|
|
|
)
|
2022-06-29 15:08:48 +00:00
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
# enter relay loop
|
|
|
|
await handle_order_updates(
|
2022-07-09 03:10:25 +00:00
|
|
|
ws,
|
2022-07-08 21:17:28 +00:00
|
|
|
stream,
|
2022-07-05 15:48:10 +00:00
|
|
|
ems_stream,
|
|
|
|
emsflow,
|
|
|
|
ids,
|
2022-07-09 03:10:25 +00:00
|
|
|
reqids2txids,
|
2022-07-05 15:48:10 +00:00
|
|
|
trans,
|
|
|
|
acctid,
|
|
|
|
acc_name,
|
|
|
|
token,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_order_updates(
|
2022-07-09 03:10:25 +00:00
|
|
|
ws: NoBsWs,
|
|
|
|
ws_stream: AsyncIterator,
|
2022-07-05 15:48:10 +00:00
|
|
|
ems_stream: tractor.MsgStream,
|
|
|
|
emsflow: dict[str, list[MsgUnion]],
|
|
|
|
ids: bidict[str, int],
|
2022-07-10 20:16:23 +00:00
|
|
|
reqids2txids: bidict[int, str],
|
2022-07-11 00:05:31 +00:00
|
|
|
trans: set[pp.Transaction],
|
2022-07-05 15:48:10 +00:00
|
|
|
acctid: str,
|
|
|
|
acc_name: str,
|
|
|
|
token: str,
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
'''
|
|
|
|
Main msg handling loop for all things order management.
|
|
|
|
|
|
|
|
This code is broken out to make the context explicit and state variables
|
|
|
|
defined in the signature clear to the reader.
|
|
|
|
|
|
|
|
'''
|
2022-07-07 20:27:14 +00:00
|
|
|
# transaction records which will be updated
|
|
|
|
# on new trade clearing events (aka order "fills")
|
2022-07-11 00:05:31 +00:00
|
|
|
trans: set[pp.Transaction]
|
2022-07-07 20:27:14 +00:00
|
|
|
|
2022-07-08 21:17:28 +00:00
|
|
|
async for msg in ws_stream:
|
2022-07-05 15:48:10 +00:00
|
|
|
match msg:
|
|
|
|
# process and relay clearing trade events to ems
|
2022-07-03 15:18:45 +00:00
|
|
|
# https://docs.kraken.com/websockets/#message-ownTrades
|
2022-07-10 20:16:23 +00:00
|
|
|
# TODO: turns out you get the fill events from the
|
|
|
|
# `openOrders` before you get this, so it might be better
|
|
|
|
# to do all fill/status/pp updates in that sub and just use
|
|
|
|
# this one for ledger syncs?
|
|
|
|
# For eg. we could take the "last 50 trades" and do a diff
|
|
|
|
# with the ledger and then only do a re-sync if something
|
|
|
|
# seems amiss?
|
2022-07-05 15:48:10 +00:00
|
|
|
case [
|
|
|
|
trades_msgs,
|
|
|
|
'ownTrades',
|
|
|
|
{'sequence': seq},
|
|
|
|
]:
|
2022-07-10 20:16:23 +00:00
|
|
|
log.info(
|
|
|
|
f'ownTrades update_{seq}:\n'
|
|
|
|
f'{pformat(trades_msgs)}'
|
|
|
|
)
|
2022-07-07 20:27:14 +00:00
|
|
|
# flatten msgs to an {id -> data} table for processing
|
2022-07-05 15:48:10 +00:00
|
|
|
trades = {
|
|
|
|
tid: trade
|
|
|
|
for entry in trades_msgs
|
|
|
|
for (tid, trade) in entry.items()
|
|
|
|
|
|
|
|
# only emit entries which are already not-in-ledger
|
|
|
|
if tid not in {r.tid for r in trans}
|
|
|
|
}
|
|
|
|
for tid, trade in trades.items():
|
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
txid = trade['ordertxid']
|
|
|
|
|
|
|
|
# NOTE: yet again, here we don't have any ref to the
|
|
|
|
# reqid that's generated by us (as the client) and
|
|
|
|
# sent in the order request, so we have to look it
|
|
|
|
# up from our own registry...
|
|
|
|
reqid = reqids2txids.inverse[txid]
|
2022-07-08 23:00:13 +00:00
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
action = trade['type']
|
|
|
|
price = float(trade['price'])
|
|
|
|
size = float(trade['vol'])
|
|
|
|
broker_time = float(trade['time'])
|
|
|
|
|
|
|
|
# send a fill msg for gui update
|
|
|
|
fill_msg = BrokerdFill(
|
|
|
|
time_ns=time.time_ns(),
|
2022-07-10 20:16:23 +00:00
|
|
|
reqid=reqid,
|
2022-07-05 15:48:10 +00:00
|
|
|
|
|
|
|
action=action,
|
|
|
|
size=size,
|
|
|
|
price=price,
|
2022-07-10 20:16:23 +00:00
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
# TODO: maybe capture more msg data
|
|
|
|
# i.e fees?
|
2022-07-10 20:16:23 +00:00
|
|
|
broker_details={'name': 'kraken'} | trade,
|
2022-07-05 15:48:10 +00:00
|
|
|
broker_time=broker_time
|
|
|
|
)
|
|
|
|
await ems_stream.send(fill_msg)
|
|
|
|
|
|
|
|
filled_msg = BrokerdStatus(
|
|
|
|
reqid=reqid,
|
|
|
|
time_ns=time.time_ns(),
|
|
|
|
|
|
|
|
account=acc_name,
|
|
|
|
status='filled',
|
|
|
|
filled=size,
|
|
|
|
reason='Order filled by kraken',
|
|
|
|
broker_details={
|
|
|
|
'name': 'kraken',
|
|
|
|
'broker_time': broker_time
|
|
|
|
},
|
|
|
|
|
|
|
|
# TODO: figure out if kraken gives a count
|
|
|
|
# of how many units of underlying were
|
|
|
|
# filled. Alternatively we can decrement
|
|
|
|
# this value ourselves by associating and
|
|
|
|
# calcing from the diff with the original
|
|
|
|
# client-side request, see:
|
|
|
|
# https://github.com/pikers/piker/issues/296
|
|
|
|
remaining=0,
|
|
|
|
)
|
|
|
|
await ems_stream.send(filled_msg)
|
|
|
|
|
2022-07-11 00:05:31 +00:00
|
|
|
if not trades:
|
|
|
|
# skip pp emissions if we have already
|
|
|
|
# processed all trades in this msg.
|
|
|
|
continue
|
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
# update ledger and position tracking
|
2022-07-11 00:05:31 +00:00
|
|
|
await tractor.breakpoint()
|
|
|
|
trans: set[pp.Transaction]
|
|
|
|
with open_ledger(
|
|
|
|
acctid,
|
|
|
|
trades,
|
|
|
|
|
|
|
|
) as trans:
|
|
|
|
# TODO: ideally we can pass in an existing
|
2022-07-10 20:16:23 +00:00
|
|
|
# pps state to this right? such that we
|
|
|
|
# don't have to do a ledger reload all the
|
|
|
|
# time..
|
2022-07-05 20:39:18 +00:00
|
|
|
active, closed = pp.update_pps_conf(
|
|
|
|
'kraken',
|
|
|
|
acctid,
|
2022-07-11 00:05:31 +00:00
|
|
|
trade_records=list(trans),
|
2022-07-05 20:39:18 +00:00
|
|
|
ledger_reload={}.fromkeys(
|
|
|
|
t.bsuid for t in trans),
|
|
|
|
)
|
2022-07-03 18:39:33 +00:00
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
# emit any new pp msgs to ems
|
|
|
|
for pos in filter(
|
|
|
|
bool,
|
|
|
|
chain(active.values(), closed.values()),
|
|
|
|
):
|
|
|
|
pp_msg = BrokerdPosition(
|
|
|
|
broker='kraken',
|
|
|
|
|
|
|
|
# XXX: ok so this is annoying, we're
|
|
|
|
# relaying an account name with the
|
|
|
|
# backend suffix prefixed but when
|
|
|
|
# reading accounts from ledgers we
|
|
|
|
# don't need it and/or it's prefixed
|
|
|
|
# in the section table.. we should
|
|
|
|
# just strip this from the message
|
|
|
|
# right since `.broker` is already
|
|
|
|
# included?
|
|
|
|
account=f'kraken.{acctid}',
|
|
|
|
symbol=pos.symbol.front_fqsn(),
|
|
|
|
size=pos.size,
|
|
|
|
avg_price=pos.be_price,
|
|
|
|
|
|
|
|
# TODO
|
|
|
|
# currency=''
|
|
|
|
)
|
|
|
|
await ems_stream.send(pp_msg)
|
|
|
|
|
|
|
|
# process and relay order state change events
|
|
|
|
# https://docs.kraken.com/websockets/#message-openOrders
|
|
|
|
case [
|
|
|
|
order_msgs,
|
|
|
|
'openOrders',
|
|
|
|
{'sequence': seq},
|
|
|
|
]:
|
|
|
|
for order_msg in order_msgs:
|
|
|
|
log.info(
|
2022-07-10 20:16:23 +00:00
|
|
|
f'`openOrders` msg update_{seq}:\n'
|
2022-07-05 15:48:10 +00:00
|
|
|
f'{pformat(order_msg)}'
|
|
|
|
)
|
|
|
|
txid, update_msg = list(order_msg.items())[0]
|
|
|
|
match update_msg:
|
2022-07-05 16:58:08 +00:00
|
|
|
|
|
|
|
# we ignore internal order updates triggered by
|
|
|
|
# kraken's "edit" endpoint.
|
2022-07-05 15:48:10 +00:00
|
|
|
case {
|
|
|
|
'cancel_reason': 'Order replaced',
|
|
|
|
'status': status,
|
2022-07-10 20:16:23 +00:00
|
|
|
# 'userref': reqid, # XXX: always zero bug XD
|
|
|
|
# **rest,
|
2022-07-05 15:48:10 +00:00
|
|
|
}:
|
2022-07-10 20:16:23 +00:00
|
|
|
log.info(
|
|
|
|
f'Order {txid} was replaced'
|
|
|
|
)
|
2022-07-05 15:48:10 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
case {
|
2022-07-10 20:16:23 +00:00
|
|
|
# XXX: lol, ws bug, this is always 0!
|
|
|
|
'userref': _,
|
|
|
|
|
|
|
|
# during a fill this field is **not**
|
|
|
|
# provided! but, it is always avail on
|
|
|
|
# actual status updates.. see case above.
|
2022-07-05 15:48:10 +00:00
|
|
|
'status': status,
|
|
|
|
**rest,
|
|
|
|
|
|
|
|
# XXX: eg. of remaining msg schema:
|
|
|
|
# 'avg_price': _,
|
|
|
|
# 'cost': _,
|
|
|
|
# 'descr': {
|
|
|
|
# 'close': None,
|
|
|
|
# 'leverage': None,
|
|
|
|
# 'order': descr,
|
|
|
|
# 'ordertype': 'limit',
|
|
|
|
# 'pair': 'XMR/EUR',
|
|
|
|
# 'price': '74.94000000',
|
|
|
|
# 'price2': '0.00000000',
|
|
|
|
# 'type': 'buy'
|
|
|
|
# },
|
|
|
|
# 'expiretm': None,
|
|
|
|
# 'fee': '0.00000000',
|
|
|
|
# 'limitprice': '0.00000000',
|
|
|
|
# 'misc': '',
|
|
|
|
# 'oflags': 'fciq',
|
|
|
|
# 'opentm': '1656966131.337344',
|
|
|
|
# 'refid': None,
|
|
|
|
# 'starttm': None,
|
|
|
|
# 'stopprice': '0.00000000',
|
|
|
|
# 'timeinforce': 'GTC',
|
|
|
|
# 'vol': submit_vlm, # '13.34400854',
|
|
|
|
# 'vol_exec': exec_vlm, # 0.0000
|
|
|
|
}:
|
|
|
|
ems_status = {
|
|
|
|
'open': 'submitted',
|
2022-07-10 20:16:23 +00:00
|
|
|
'closed': 'filled',
|
2022-07-05 15:48:10 +00:00
|
|
|
'canceled': 'cancelled',
|
|
|
|
# do we even need to forward
|
|
|
|
# this state to the ems?
|
|
|
|
'pending': 'pending',
|
|
|
|
}[status]
|
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
# TODO: store this in a ChainMap instance
|
|
|
|
# per order dialog.
|
|
|
|
# submit_vlm = rest.get('vol', 0)
|
|
|
|
# fee = rest.get('fee', 0)
|
|
|
|
if status == 'closed':
|
|
|
|
vlm = 0
|
|
|
|
else:
|
|
|
|
vlm = rest.get('vol_exec', 0)
|
2022-07-05 15:48:10 +00:00
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
reqid = reqids2txids.inverse[txid]
|
2022-07-09 03:10:25 +00:00
|
|
|
|
|
|
|
oid = ids.inverse.get(reqid)
|
|
|
|
if not oid:
|
|
|
|
# TODO: handle these and relay them
|
|
|
|
# through the EMS to the client / UI
|
|
|
|
# side!
|
|
|
|
log.warning(
|
|
|
|
f'Received active order {txid}:\n'
|
|
|
|
f'{update_msg}\n'
|
|
|
|
'Cancelling order for now!..'
|
|
|
|
)
|
|
|
|
|
|
|
|
# call ws api to cancel:
|
|
|
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
|
|
|
await ws.send_msg({
|
|
|
|
'event': 'cancelOrder',
|
|
|
|
'token': token,
|
|
|
|
'reqid': reqid,
|
|
|
|
'txid': [txid],
|
|
|
|
})
|
|
|
|
continue
|
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
msgs = emsflow[oid]
|
|
|
|
|
|
|
|
# send BrokerdStatus messages for all
|
|
|
|
# order state updates
|
2022-07-05 15:03:32 +00:00
|
|
|
resp = BrokerdStatus(
|
2022-07-05 15:48:10 +00:00
|
|
|
|
2022-07-08 23:00:13 +00:00
|
|
|
reqid=reqid,
|
2022-07-05 15:48:10 +00:00
|
|
|
time_ns=time.time_ns(), # cuz why not
|
|
|
|
account=f'kraken.{acctid}',
|
|
|
|
|
|
|
|
# everyone doin camel case..
|
|
|
|
status=ems_status, # force lower case
|
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
filled=vlm,
|
2022-07-05 15:48:10 +00:00
|
|
|
reason='', # why held?
|
2022-07-10 20:16:23 +00:00
|
|
|
remaining=vlm,
|
|
|
|
|
|
|
|
# TODO: need to extract the submit vlm
|
|
|
|
# from a prior msg update..
|
|
|
|
# (
|
|
|
|
# float(submit_vlm)
|
|
|
|
# -
|
|
|
|
# float(exec_vlm)
|
|
|
|
# ),
|
2022-07-05 15:48:10 +00:00
|
|
|
|
|
|
|
broker_details=dict(
|
|
|
|
{'name': 'kraken'}, **update_msg
|
|
|
|
),
|
2022-07-05 02:00:56 +00:00
|
|
|
)
|
2022-07-05 15:03:32 +00:00
|
|
|
msgs.append(resp)
|
2022-07-09 16:59:09 +00:00
|
|
|
await ems_stream.send(resp)
|
2022-07-05 02:00:56 +00:00
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
# fill event.
|
|
|
|
# there is no `status` field
|
|
|
|
case {
|
|
|
|
'vol_exec': vlm,
|
|
|
|
**rest,
|
|
|
|
}:
|
|
|
|
# eg. fill msg contents (in total):
|
|
|
|
# {
|
|
|
|
# 'vol_exec': '0.84709869',
|
|
|
|
# 'cost': '101.25370642',
|
|
|
|
# 'fee': '0.26325964',
|
|
|
|
# 'avg_price': '119.53000001',
|
|
|
|
# 'userref': 0,
|
|
|
|
# }
|
|
|
|
# TODO: emit fill msg from here
|
|
|
|
reqid = reqids2txids.inverse[txid]
|
|
|
|
log.info(
|
|
|
|
f'openOrders vlm={vlm} Fill for {reqid}:\n'
|
|
|
|
f'{update_msg}'
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
case _:
|
|
|
|
log.warning(
|
|
|
|
'Unknown orders msg:\n'
|
|
|
|
f'{txid}:{order_msg}'
|
|
|
|
)
|
|
|
|
|
2022-07-10 20:16:23 +00:00
|
|
|
# TODO: given the 'openOrders' sub , pretty
|
|
|
|
# much all the msgs we get for this sub are duplicate
|
|
|
|
# of the (incremental) updates in that one though we still
|
|
|
|
# need them because that sub seems to have a bug where the
|
|
|
|
# `userref` field is always 0 instead of our generated reqid
|
|
|
|
# value...
|
|
|
|
# Not sure why kraken devs decided to repeat themselves but
|
|
|
|
# it almost seems as though we could drop this entire sub
|
|
|
|
# and get everything we need by just parsing msgs correctly
|
|
|
|
# above? The only reason for this seems to be remapping
|
|
|
|
# underlying `txid` values on order "edits" which the
|
|
|
|
# `openOrders` sub doesn't seem to have any knowledge of.
|
|
|
|
# I'd also like to ask them which event guarantees that the
|
|
|
|
# the live order is now in the book, since these status ones
|
|
|
|
# almost seem more like request-acks then state guarantees.
|
2022-07-05 15:48:10 +00:00
|
|
|
case {
|
|
|
|
'event': etype,
|
|
|
|
'status': status,
|
|
|
|
'reqid': reqid,
|
2022-07-09 03:10:25 +00:00
|
|
|
**rest,
|
2022-07-05 16:58:08 +00:00
|
|
|
} as event if (
|
|
|
|
etype in {
|
|
|
|
'addOrderStatus',
|
|
|
|
'editOrderStatus',
|
|
|
|
'cancelOrderStatus',
|
|
|
|
}
|
2022-07-05 15:48:10 +00:00
|
|
|
):
|
2022-07-10 20:16:23 +00:00
|
|
|
log.info(
|
|
|
|
f'{etype}:\n'
|
|
|
|
f'{pformat(msg)}'
|
|
|
|
)
|
2022-07-09 03:10:25 +00:00
|
|
|
oid = ids.inverse.get(reqid)
|
2022-07-10 20:16:23 +00:00
|
|
|
# TODO: relay these to EMS once it supports
|
|
|
|
# open order loading.
|
2022-07-09 03:10:25 +00:00
|
|
|
if not oid:
|
|
|
|
log.warning(
|
|
|
|
'Unknown order status update?:\n'
|
|
|
|
f'{event}'
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
|
|
|
|
txid = rest.get('txid')
|
|
|
|
if txid:
|
2022-07-10 20:16:23 +00:00
|
|
|
# XXX: we **must** do this mapping for edit order
|
|
|
|
# status updates since the `openOrders` sub above
|
|
|
|
# never relays back the correct client-side `reqid`
|
|
|
|
# that is put in the order request..
|
2022-07-09 03:10:25 +00:00
|
|
|
reqids2txids[reqid] = txid
|
|
|
|
|
2022-07-05 15:48:10 +00:00
|
|
|
msgs = emsflow[oid]
|
|
|
|
last = msgs[-1]
|
2022-07-05 16:58:08 +00:00
|
|
|
resps, errored = process_status(
|
|
|
|
event,
|
|
|
|
oid,
|
|
|
|
token,
|
|
|
|
msgs,
|
|
|
|
last,
|
2022-07-05 15:48:10 +00:00
|
|
|
)
|
2022-07-08 23:00:13 +00:00
|
|
|
if resps:
|
|
|
|
msgs.extend(resps)
|
|
|
|
for resp in resps:
|
2022-07-09 03:10:25 +00:00
|
|
|
await ems_stream.send(resp)
|
2022-07-05 15:48:10 +00:00
|
|
|
|
2022-07-05 16:58:08 +00:00
|
|
|
case _:
|
|
|
|
log.warning(f'Unhandled trades update msg: {msg}')
|
2022-07-05 15:48:10 +00:00
|
|
|
|
|
|
|
|
2022-07-05 16:58:08 +00:00
|
|
|
def process_status(
|
|
|
|
event: dict[str, str],
|
|
|
|
oid: str,
|
|
|
|
token: str,
|
|
|
|
msgs: list[MsgUnion],
|
|
|
|
last: MsgUnion,
|
2022-07-05 15:48:10 +00:00
|
|
|
|
2022-07-05 16:58:08 +00:00
|
|
|
) -> tuple[list[MsgUnion], bool]:
|
|
|
|
'''
|
|
|
|
Process `'[add/edit/cancel]OrderStatus'` events by translating to
|
|
|
|
and returning the equivalent EMS-msg responses.
|
2022-07-05 15:48:10 +00:00
|
|
|
|
2022-07-05 16:58:08 +00:00
|
|
|
'''
|
|
|
|
match event:
|
|
|
|
case {
|
|
|
|
'event': etype,
|
|
|
|
'status': 'error',
|
|
|
|
'reqid': reqid,
|
|
|
|
'errorMessage': errmsg,
|
|
|
|
}:
|
|
|
|
# any of ``{'add', 'edit', 'cancel'}``
|
|
|
|
action = etype.rstrip('OrderStatus')
|
|
|
|
log.error(
|
|
|
|
f'Failed to {action} order {reqid}:\n'
|
|
|
|
f'{errmsg}'
|
|
|
|
)
|
|
|
|
resp = BrokerdError(
|
|
|
|
oid=oid,
|
|
|
|
# XXX: use old reqid in case it changed?
|
2022-07-08 23:00:13 +00:00
|
|
|
reqid=reqid,
|
2022-07-05 21:35:17 +00:00
|
|
|
symbol=getattr(last, 'symbol', 'N/A'),
|
2022-07-05 16:58:08 +00:00
|
|
|
|
|
|
|
reason=f'Failed {action}:\n{errmsg}',
|
|
|
|
broker_details=event
|
|
|
|
)
|
|
|
|
return [resp], True
|
|
|
|
|
|
|
|
# successful request cases
|
|
|
|
case {
|
|
|
|
'event': 'addOrderStatus',
|
|
|
|
'status': "ok",
|
|
|
|
'reqid': reqid, # oid from ems side
|
|
|
|
'txid': txid,
|
|
|
|
'descr': descr, # only on success?
|
|
|
|
}:
|
|
|
|
log.info(
|
2022-07-10 20:16:23 +00:00
|
|
|
f'Submitted order: {descr}\n'
|
2022-07-05 16:58:08 +00:00
|
|
|
f'ems oid: {oid}\n'
|
2022-07-10 20:16:23 +00:00
|
|
|
f'brokerd reqid: {reqid}\n'
|
2022-07-05 16:58:08 +00:00
|
|
|
f'txid: {txid}\n'
|
|
|
|
)
|
2022-07-08 23:00:13 +00:00
|
|
|
return [], False
|
2022-07-05 16:58:08 +00:00
|
|
|
|
|
|
|
case {
|
|
|
|
'event': 'editOrderStatus',
|
|
|
|
'status': "ok",
|
|
|
|
'reqid': reqid, # oid from ems side
|
|
|
|
'descr': descr,
|
|
|
|
|
|
|
|
# NOTE: for edit request this is a new value
|
|
|
|
'txid': txid,
|
|
|
|
'originaltxid': origtxid,
|
|
|
|
}:
|
|
|
|
log.info(
|
|
|
|
f'Editting order {oid}[requid={reqid}]:\n'
|
2022-07-10 20:16:23 +00:00
|
|
|
f'brokerd reqid: {reqid}\n'
|
2022-07-05 16:58:08 +00:00
|
|
|
f'txid: {origtxid} -> {txid}\n'
|
|
|
|
f'{descr}'
|
|
|
|
)
|
|
|
|
# deliver another ack to update the ems-side `.reqid`.
|
2022-07-08 23:00:13 +00:00
|
|
|
return [], False
|
2022-07-05 16:58:08 +00:00
|
|
|
|
|
|
|
case {
|
|
|
|
"event": "cancelOrderStatus",
|
|
|
|
"status": "ok",
|
|
|
|
'reqid': reqid,
|
|
|
|
|
|
|
|
# XXX: sometimes this isn't provided!?
|
|
|
|
# 'txid': txids,
|
|
|
|
**rest,
|
|
|
|
}:
|
|
|
|
for txid in rest.get('txid', [last.reqid]):
|
2022-07-10 20:16:23 +00:00
|
|
|
log.info(
|
|
|
|
f'Cancelling order {oid}[requid={reqid}]:\n'
|
|
|
|
f'brokerd reqid: {reqid}\n'
|
2022-07-05 15:48:10 +00:00
|
|
|
)
|
2022-07-10 20:16:23 +00:00
|
|
|
return [], False
|
2022-07-02 19:40:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
def norm_trade_records(
|
|
|
|
ledger: dict[str, Any],
|
|
|
|
|
|
|
|
) -> list[pp.Transaction]:
|
|
|
|
|
|
|
|
records: list[pp.Transaction] = []
|
|
|
|
for tid, record in ledger.items():
|
|
|
|
|
2022-07-07 20:27:14 +00:00
|
|
|
size = float(record.get('vol')) * {
|
2022-07-02 19:40:59 +00:00
|
|
|
'buy': 1,
|
|
|
|
'sell': -1,
|
|
|
|
}[record['type']]
|
2022-07-11 00:05:31 +00:00
|
|
|
|
|
|
|
# we normalize to kraken's `altname` always..
|
|
|
|
bsuid = norm_sym = Client.normalize_symbol(record['pair'])
|
2022-07-02 19:40:59 +00:00
|
|
|
|
|
|
|
records.append(
|
|
|
|
pp.Transaction(
|
|
|
|
fqsn=f'{norm_sym}.kraken',
|
|
|
|
tid=tid,
|
2022-07-07 20:27:14 +00:00
|
|
|
size=size,
|
2022-07-02 19:40:59 +00:00
|
|
|
price=float(record['price']),
|
|
|
|
cost=float(record['fee']),
|
2022-07-03 18:39:33 +00:00
|
|
|
dt=pendulum.from_timestamp(float(record['time'])),
|
2022-07-02 19:40:59 +00:00
|
|
|
bsuid=bsuid,
|
|
|
|
|
|
|
|
# XXX: there are no derivs on kraken right?
|
|
|
|
# expiry=expiry,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
return records
|
|
|
|
|
|
|
|
|
2022-07-05 20:39:18 +00:00
|
|
|
@cm
|
|
|
|
def open_ledger(
|
2022-07-02 19:40:59 +00:00
|
|
|
acctid: str,
|
|
|
|
trade_entries: list[dict[str, Any]],
|
|
|
|
|
2022-07-11 00:05:31 +00:00
|
|
|
) -> set[pp.Transaction]:
|
2022-07-05 16:58:08 +00:00
|
|
|
'''
|
|
|
|
Write recent session's trades to the user's (local) ledger file.
|
2022-07-02 19:40:59 +00:00
|
|
|
|
2022-07-05 16:58:08 +00:00
|
|
|
'''
|
2022-07-02 19:40:59 +00:00
|
|
|
with pp.open_trade_ledger(
|
|
|
|
'kraken',
|
|
|
|
acctid,
|
|
|
|
) as ledger:
|
|
|
|
|
2022-07-05 20:39:18 +00:00
|
|
|
# normalize to transaction form
|
2022-07-11 00:05:31 +00:00
|
|
|
# TODO: cawt damn, we should probably delegate to cryptofeed for
|
|
|
|
# this insteada of re-hacking kraken's total crap?
|
2022-07-05 20:39:18 +00:00
|
|
|
records = norm_trade_records(trade_entries)
|
2022-07-11 00:05:31 +00:00
|
|
|
yield set(records)
|
2022-07-05 20:39:18 +00:00
|
|
|
|
|
|
|
# update on exit
|
|
|
|
ledger.update(trade_entries)
|