Merge pull request #349 from pikers/kraken_ws_orders

Kraken ws orders
msgpack_zombie
goodboy 2022-08-05 21:01:24 -04:00 committed by GitHub
commit a9185e7d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1261 additions and 474 deletions

View File

@ -33,7 +33,6 @@ import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
import wsproto
from .._cacheables import open_cached_client
@ -106,14 +105,14 @@ class Pair(Struct, frozen=True):
permissions: list[str]
@dataclass
class OHLC:
"""Description of the flattened OHLC quote format.
class OHLC(Struct):
'''
Description of the flattened OHLC quote format.
For schema details see:
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams
"""
'''
time: int
open: float
@ -262,6 +261,7 @@ class Client:
for i, bar in enumerate(bars):
bar = OHLC(*bar)
bar.typecast()
row = []
for j, (name, ftype) in enumerate(_ohlc_dtype[1:]):

View File

@ -0,0 +1,64 @@
``kraken`` backend
------------------
though they don't have the most liquidity of all the cexes they sure are
accommodating to those of us who appreciate a little ``xmr``.
status
******
current support is *production grade* and both real-time data and order
management should be correct and fast. this backend is used by core devs
for live trading.
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[kraken]
accounts.spot = 'spot'
key_descr = "spot"
api_key = "69696969696969696696969696969696969696969696969696969696"
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
If everything works correctly you should see any current positions
loaded in the pps pane on chart load and you should also be able to
check your trade records in the file::
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
An example ledger file will have entries written verbatim from the
trade events schema:
.. code:: toml
[TFJBKK-SMBZS-VJ4UWS]
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
postxid = "SMBZSE-M7IF5-CFI7LT"
pair = "XXMRZEUR"
time = 1655691993.4133966
type = "buy"
ordertype = "limit"
price = "103.97000000"
cost = "499.99999977"
fee = "0.80000000"
vol = "4.80907954"
margin = "0.00000000"
misc = ""
your ``pps.toml`` file will have position entries like,
.. code:: toml
[kraken.spot."xmreur.kraken"]
size = 4.80907954
ppu = 103.97000000
bsuid = "XXMRZEUR"
clears = [
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
]

View File

@ -19,7 +19,6 @@ Kraken web API wrapping.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import field
from datetime import datetime
import itertools
from typing import (
@ -29,17 +28,16 @@ from typing import (
)
import time
# import trio
# import tractor
from bidict import bidict
import pendulum
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
from pydantic.dataclasses import dataclass
import urllib.parse
import hashlib
import hmac
import base64
import trio
from piker import config
from piker.brokers._util import (
@ -48,6 +46,7 @@ from piker.brokers._util import (
BrokerError,
DataThrottle,
)
from piker.pp import Transaction
from . import log
# <uri>/<version>/
@ -77,31 +76,6 @@ _symbol_info_translation: dict[str, str] = {
}
@dataclass
class OHLC:
'''
Description of the flattened OHLC quote format.
For schema details see:
https://docs.kraken.com/websockets/#message-ohlc
'''
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: list[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
@ -141,8 +115,13 @@ class InvalidKey(ValueError):
class Client:
# global symbol normalization table
_ntable: dict[str, str] = {}
_atable: bidict[str, str] = bidict()
def __init__(
self,
config: dict[str, str],
name: str = '',
api_key: str = '',
secret: str = ''
@ -153,6 +132,7 @@ class Client:
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self.conf: dict[str, str] = config
self._pairs: list[str] = []
self._name = name
self._api_key = api_key
@ -212,8 +192,36 @@ class Client:
data['nonce'] = str(int(1000*time.time()))
return await self._private(method, data, uri_path)
async def get_balances(
self,
) -> dict[str, float]:
'''
Return the set of asset balances for this account
by symbol.
'''
resp = await self.endpoint(
'Balance',
{},
)
by_bsuid = resp['result']
return {
self._atable[sym].lower(): float(bal)
for sym, bal in by_bsuid.items()
}
async def get_assets(self) -> dict[str, dict]:
resp = await self._public('Assets', {})
return resp['result']
async def cache_assets(self) -> None:
assets = self.assets = await self.get_assets()
for bsuid, info in assets.items():
self._atable[bsuid] = info['altname']
async def get_trades(
self,
fetch_limit: int = 10,
) -> dict[str, Any]:
'''
@ -225,6 +233,8 @@ class Client:
trades_by_id: dict[str, Any] = {}
for i in itertools.count():
if i >= fetch_limit:
break
# increment 'ofs' pagination offset
ofs = i*50
@ -254,6 +264,61 @@ class Client:
assert count == len(trades_by_id.values())
return trades_by_id
async def get_xfers(
self,
asset: str,
src_asset: str = '',
) -> dict[str, Transaction]:
'''
Get asset balance transfer transactions.
Currently only withdrawals are supported.
'''
xfers: list[dict] = (await self.endpoint(
'WithdrawStatus',
{'asset': asset},
))['result']
# eg. resp schema:
# 'result': [{'method': 'Bitcoin', 'aclass': 'currency', 'asset':
# 'XXBT', 'refid': 'AGBJRMB-JHD2M4-NDI3NR', 'txid':
# 'b95d66d3bb6fd76cbccb93f7639f99a505cb20752c62ea0acc093a0e46547c44',
# 'info': 'bc1qc8enqjekwppmw3g80p56z5ns7ze3wraqk5rl9z',
# 'amount': '0.00300726', 'fee': '0.00001000', 'time':
# 1658347714, 'status': 'Success'}]}
trans: dict[str, Transaction] = {}
for entry in xfers:
# look up the normalized name
asset = self._atable[entry['asset']].lower()
# XXX: this is in the asset units (likely) so it isn't
# quite the same as a commisions cost necessarily..)
cost = float(entry['fee'])
tran = Transaction(
fqsn=asset + '.kraken',
tid=entry['txid'],
dt=pendulum.from_timestamp(entry['time']),
bsuid=f'{asset}{src_asset}',
size=-1*(
float(entry['amount'])
+
cost
),
# since this will be treated as a "sell" it
# shouldn't be needed to compute the be price.
price='NaN',
# XXX: see note above
cost=0,
)
trans[tran.tid] = tran
return trans
async def submit_limit(
self,
symbol: str,
@ -282,6 +347,7 @@ class Client:
"volume": str(size),
}
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid
@ -301,7 +367,9 @@ class Client:
async def symbol_info(
self,
pair: Optional[str] = None,
):
) -> dict[str, dict[str, str]]:
if pair is not None:
pairs = {'pair': pair}
else:
@ -327,6 +395,12 @@ class Client:
if not self._pairs:
self._pairs = await self.symbol_info()
ntable = {}
for restapikey, info in self._pairs.items():
ntable[restapikey] = ntable[info['wsname']] = info['altname']
self._ntable.update(ntable)
return self._pairs
async def search_symbols(
@ -424,45 +498,43 @@ class Client:
else:
raise BrokerError(errmsg)
@classmethod
def normalize_symbol(
cls,
ticker: str
) -> str:
'''
Normalize symbol names to to a 3x3 pair from the global
definition map which we build out from the data retreived from
the 'AssetPairs' endpoint, see methods above.
'''
ticker = cls._ntable[ticker]
symlen = len(ticker)
if symlen != 6:
raise ValueError(f'Unhandled symbol: {ticker}')
return ticker.lower()
@acm
async def get_client() -> Client:
section = get_config()
if section:
conf = get_config()
if conf:
client = Client(
name=section['key_descr'],
api_key=section['api_key'],
secret=section['secret']
conf,
name=conf['key_descr'],
api_key=conf['api_key'],
secret=conf['secret']
)
else:
client = Client()
client = Client({})
# at startup, load all symbols locally for fast search
await client.cache_symbols()
# at startup, load all symbols, and asset info in
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.cache_assets)
await client.cache_symbols()
yield client
def normalize_symbol(
ticker: str
) -> str:
'''
Normalize symbol names to to a 3x3 pair.
'''
remap = {
'XXBTZEUR': 'XBTEUR',
'XXMRZEUR': 'XMREUR',
# ws versions? pretty weird..
'XBT/EUR': 'XBTEUR',
'XMR/EUR': 'XMREUR',
}
symlen = len(ticker)
if symlen != 6:
ticker = remap[ticker]
else:
raise ValueError(f'Unhandled symbol: {ticker}')
return ticker.lower()

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,6 @@ Real-time and historical data feed endpoints.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from typing import (
Any,
@ -28,6 +27,7 @@ from typing import (
)
import time
from async_generator import aclosing
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
@ -49,7 +49,6 @@ from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
Client,
OHLC,
)
@ -87,6 +86,30 @@ class Pair(Struct):
ordermin: float # minimum order volume for pair
class OHLC(Struct):
'''
Description of the flattened OHLC quote format.
For schema details see:
https://docs.kraken.com/websockets/#message-ohlc
'''
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: list[Any] = []
async def stream_messages(
ws: NoBsWs,
):
@ -117,9 +140,8 @@ async def stream_messages(
too_slow_count = 0
continue
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
match msg:
case {'event': 'heartbeat'}:
now = time.time()
delay = now - last_hb
last_hb = now
@ -130,11 +152,9 @@ async def stream_messages(
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
yield msg
case _:
# passthrough sub msgs
yield msg
async def process_data_feed_msgs(
@ -145,44 +165,69 @@ async def process_data_feed_msgs(
'''
async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
chan_id, *payload_array, chan_name, pair = msg
case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
if 'ohlc' in chan_name:
case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
ohlc = OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
ohlc.typecast()
yield 'ohlc', ohlc
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
elif 'spread' in chan_name:
elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(
float, payload_array[0])
bid, ask, ts, bsize, asize = map(float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
case _:
print(f'UNHANDLED MSG: {msg}')
# yield msg
def normalize(
ohlc: OHLC,
) -> dict:
quote = asdict(ohlc)
quote = ohlc.to_dict()
quote['broker_ts'] = quote['time']
quote['brokerd_ts'] = time.time()
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
@ -376,17 +421,15 @@ async def stream_quotes(
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws:
async with (
open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws,
aclosing(process_data_feed_msgs(ws)) as msg_gen,
):
# pull a first quote and deliver
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()
typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last)
task_status.started((init_msgs, quote))

View File

@ -88,7 +88,8 @@ def mk_check(
@dataclass
class _DarkBook:
'''EMS-trigger execution book.
'''
EMS-trigger execution book.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
@ -653,6 +654,13 @@ async def translate_and_relay_brokerd_events(
else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request
if name == 'ack':
@ -663,6 +671,10 @@ async def translate_and_relay_brokerd_events(
# a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid
log.info(
'Rx ACK for order\n'
f'oid: {oid} -> reqid: {reqid}'
)
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
@ -690,6 +702,9 @@ async def translate_and_relay_brokerd_events(
# a live flow now exists
oid = entry.oid
# TODO: instead this should be our status set.
# ack, open, fill, closed, cancelled'
resp = None
broker_details = {}

View File

@ -186,6 +186,7 @@ class BrokerdStatus(Struct):
# XXX: should be best effort set for every update
account: str = ''
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted',
# 'cancelled',

View File

@ -39,7 +39,11 @@ from docker.errors import (
APIError,
# ContainerError,
)
from requests.exceptions import ConnectionError, ReadTimeout
import requests
from requests.exceptions import (
ConnectionError,
ReadTimeout,
)
from ..log import get_logger, get_console_log
from .. import config
@ -188,13 +192,12 @@ class Container:
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
log.error(
f'SIGKILL-ing: {self.cntr.id} after {delay}s\n'
)
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
@ -218,20 +221,25 @@ class Container:
self.try_signal('SIGINT')
start = time.time()
for _ in range(30):
for _ in range(6):
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.cancel('polling for CNTR logs...')
try:
await self.process_logs_until(stop_msg)
except ApplicationLogError:
hard_kill = True
else:
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and
# terminated.
break
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
break
if cs.cancelled_caught:
# on timeout just try a hard kill after
# a quick container sync-wait.
hard_kill = True
try:
log.info(f'Polling for container shutdown:\n{cid}')
@ -254,9 +262,16 @@ class Container:
except (
docker.errors.APIError,
ConnectionError,
requests.exceptions.ConnectionError,
trio.Cancelled,
):
log.exception('Docker connection failure')
self.hard_kill(start)
raise
except trio.Cancelled:
log.exception('trio cancelled...')
self.hard_kill(start)
else:
hard_kill = True
@ -305,16 +320,13 @@ async def open_ahabd(
))
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
finally:
# needed?
with trio.CancelScope(shield=True):
await cntr.cancel(stop_msg)
await cntr.cancel(stop_msg)
async def start_ahab(

View File

@ -66,3 +66,10 @@ class Struct(
).decode(
msgspec.msgpack.Encoder().encode(self)
)
def typecast(
self,
# fields: Optional[list[str]] = None,
) -> None:
for fname, ftype in self.__annotations__.items():
setattr(self, fname, ftype(getattr(self, fname)))

View File

@ -63,7 +63,7 @@ from ..log import get_logger
log = get_logger(__name__)
# TODO: load this from a config.toml!
_quote_throttle_rate: int = 60 # Hz
_quote_throttle_rate: int = 22 # Hz
# a working tick-type-classes template

View File

@ -794,15 +794,11 @@ async def process_trades_and_update_ui(
pp_msg_symbol = msg['symbol'].lower()
fqsn = sym.front_fqsn()
broker, key = sym.front_feed()
# print(
# f'pp msg symbol: {pp_msg_symbol}\n',
# f'fqsn: {fqsn}\n',
# f'front key: {key}\n',
# )
if (
pp_msg_symbol == fqsn.replace(f'.{broker}', '')
pp_msg_symbol == fqsn
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
):
log.info(f'{fqsn} matched pp msg: {fmsg}')
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
# update order pane widgets
@ -843,16 +839,25 @@ async def process_trades_and_update_ui(
# resp to 'cancel' request or error condition
# for action request
elif resp in (
'broker_cancelled',
'broker_inactive',
'broker_errored',
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'broker_cancelled',
'dark_cancelled'
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.warning(
f'Order {oid} failed with:\n{pformat(broker_msg)}'
log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (