Merge pull request #346 from pikers/kraken_ledger_pps

Kraken ledger pps
null_last_quote_fix
goodboy 2022-07-05 20:56:44 -04:00 committed by GitHub
commit 419ebebe72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1536 additions and 1281 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,61 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Kraken backend.
Sub-modules within break into the core functionalities:
- ``broker.py`` part for orders / trading endpoints
- ``feed.py`` for real-time data feed endpoints
- ``api.py`` for the core API machinery which is ``trio``-ized
wrapping around ``ib_insync``.
'''
from piker.log import get_logger
log = get_logger(__name__)
from .api import (
get_client,
)
from .feed import (
open_history_client,
open_symbol_search,
stream_quotes,
)
from .broker import (
trades_dialogue,
norm_trade_records,
)
__all__ = [
'get_client',
'trades_dialogue',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'norm_trade_records',
]
# tractor RPC enable arg
__enable_modules__: list[str] = [
'api',
'feed',
'broker',
]

View File

@ -0,0 +1,468 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Kraken web API wrapping.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import field
from datetime import datetime
import itertools
from typing import (
Any,
Optional,
Union,
)
import time
# import trio
# import tractor
import pendulum
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
from pydantic.dataclasses import dataclass
import urllib.parse
import hashlib
import hmac
import base64
from piker import config
from piker.brokers._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
)
from . import log
# <uri>/<version>/
_url = 'https://api.kraken.com/0'
# Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('count', int),
('bar_wap', float),
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
_symbol_info_translation: dict[str, str] = {
'tick_decimals': 'pair_decimals',
}
@dataclass
class OHLC:
'''
Description of the flattened OHLC quote format.
For schema details see:
https://docs.kraken.com/websockets/#message-ohlc
'''
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: list[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature(
urlpath: str,
data: dict[str, Any],
secret: str
) -> str:
postdata = urllib.parse.urlencode(data)
encoded = (str(data['nonce']) + postdata).encode()
message = urlpath.encode() + hashlib.sha256(encoded).digest()
mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
sigdigest = base64.b64encode(mac.digest())
return sigdigest.decode()
class InvalidKey(ValueError):
'''
EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application.
'''
class Client:
def __init__(
self,
name: str = '',
api_key: str = '',
secret: str = ''
) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._pairs: list[str] = []
self._name = name
self._api_key = api_key
self._secret = secret
@property
def pairs(self) -> dict[str, Any]:
if self._pairs is None:
raise RuntimeError(
"Make sure to run `cache_symbols()` on startup!"
)
# retreive and cache all symbols
return self._pairs
async def _public(
self,
method: str,
data: dict,
) -> dict[str, Any]:
resp = await self._sesh.post(
path=f'/public/{method}',
json=data,
timeout=float('inf')
)
return resproc(resp, log)
async def _private(
self,
method: str,
data: dict,
uri_path: str
) -> dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
}
resp = await self._sesh.post(
path=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
async def endpoint(
self,
method: str,
data: dict[str, Any]
) -> dict[str, Any]:
uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time()))
return await self._private(method, data, uri_path)
async def get_trades(
self,
) -> dict[str, Any]:
'''
Get the trades (aka cleared orders) history from the rest endpoint:
https://docs.kraken.com/rest/#operation/getTradeHistory
'''
ofs = 0
trades_by_id: dict[str, Any] = {}
for i in itertools.count():
# increment 'ofs' pagination offset
ofs = i*50
resp = await self.endpoint(
'TradesHistory',
{'ofs': ofs},
)
by_id = resp['result']['trades']
trades_by_id.update(by_id)
# we can get up to 50 results per query
if (
len(by_id) < 50
):
err = resp.get('error')
if err:
raise BrokerError(err)
# we know we received the max amount of
# trade results so there may be more history.
# catch the end of the trades
count = resp['result']['count']
break
# santity check on update
assert count == len(trades_by_id.values())
return trades_by_id
async def submit_limit(
self,
symbol: str,
price: float,
action: str,
size: float,
reqid: str = None,
validate: bool = False # set True test call without a real submission
) -> dict:
'''
Place an order and return integer request id provided by client.
'''
# Build common data dict for common keys from both endpoints
data = {
"pair": symbol,
"price": str(price),
"validate": validate
}
if reqid is None:
# Build order data for kraken api
data |= {
"ordertype": "limit",
"type": action,
"volume": str(size),
}
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid
return await self.endpoint('EditOrder', data)
async def submit_cancel(
self,
reqid: str,
) -> dict:
'''
Send cancel request for order id ``reqid``.
'''
# txid is a transaction id given by kraken
return await self.endpoint('CancelOrder', {"txid": reqid})
async def symbol_info(
self,
pair: Optional[str] = None,
):
if pair is not None:
pairs = {'pair': pair}
else:
pairs = None # get all pairs
resp = await self._public('AssetPairs', pairs)
err = resp['error']
if err:
symbolname = pairs['pair'] if pair else None
raise SymbolNotFound(f'{symbolname}.kraken')
pairs = resp['result']
if pair is not None:
_, data = next(iter(pairs.items()))
return data
else:
return pairs
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=50,
)
# repack in dict form
return {item[0]['altname']: item[0] for item in matches}
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, int(since)))
json = await self._public(
'OHLC',
data={
'pair': symbol,
'since': since,
},
)
try:
res = json['result']
res.pop('last')
bars = next(iter(res.values()))
new_bars = []
first = bars[0]
last_nz_vwap = first[-3]
if last_nz_vwap == 0:
# use close if vwap is zero
last_nz_vwap = first[-4]
# convert all fields to native types
for i, bar in enumerate(bars):
# normalize weird zero-ed vwap values..cmon kraken..
# indicates vwap didn't change since last bar
vwap = float(bar.pop(-3))
if vwap != 0:
last_nz_vwap = vwap
if vwap == 0:
vwap = last_nz_vwap
# re-insert vwap as the last of the fields
bar.append(vwap)
new_bars.append(
(i,) + tuple(
ftype(bar[j]) for j, (name, ftype) in enumerate(
_ohlc_dtype[1:]
)
)
)
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError:
errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm
async def get_client() -> Client:
section = get_config()
if section:
client = Client(
name=section['key_descr'],
api_key=section['api_key'],
secret=section['secret']
)
else:
client = Client()
# at startup, load all symbols locally for fast search
await client.cache_symbols()
yield client
def normalize_symbol(
ticker: str
) -> str:
'''
Normalize symbol names to to a 3x3 pair.
'''
remap = {
'XXBTZEUR': 'XBTEUR',
'XXMRZEUR': 'XMREUR',
# ws versions? pretty weird..
'XBT/EUR': 'XBTEUR',
'XMR/EUR': 'XMREUR',
}
symlen = len(ticker)
if symlen != 6:
ticker = remap[ticker]
else:
raise ValueError(f'Unhandled symbol: {ticker}')
return ticker.lower()

View File

@ -0,0 +1,540 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Order api and machinery
'''
from contextlib import asynccontextmanager as acm
from functools import partial
from itertools import chain
from pprint import pformat
import time
from typing import (
Any,
AsyncIterator,
# Callable,
# Optional,
# Union,
)
import pendulum
from pydantic import BaseModel
import trio
import tractor
import wsproto
from piker import pp
from piker.clearing._messages import (
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
)
from . import log
from .api import (
Client,
BrokerError,
get_client,
normalize_symbol,
)
from .feed import (
get_console_log,
open_autorecon_ws,
NoBsWs,
stream_messages,
)
class Trade(BaseModel):
'''
Trade class that helps parse and validate ownTrades stream
'''
reqid: str # kraken order transaction id
action: str # buy or sell
price: float # price of asset
size: float # vol of asset
broker_time: str # e.g GTC, GTD
async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
) -> None:
request_msg: dict
order: BrokerdOrder
async for request_msg in ems_order_stream:
log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# reason=f'Kraken only, No account found: `{account}` ?',
reason=(
'Kraken only, order mode disabled due to '
'https://github.com/pikers/piker/issues/299'
),
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
reqid=order.reqid,
)
err = resp['error']
if err:
oid = order.oid
log.error(f'Failed to submit order: {oid}')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=order.reqid,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
# TODO: handle multiple orders (cancels?)
# txid is an array of strings
if order.reqid is None:
reqid = resp['result']['txid'][0]
else:
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
# account the made the order
account=order.account
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled.
try:
result = resp['result']
count = result['count']
# check for 'error' key if we received no 'result'
except KeyError:
error = resp.get('error')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
if not error:
raise BrokerError(f'Unknown order cancel response: {resp}')
else:
if not count: # no orders were cancelled?
# XXX: what exactly is this from and why would we care?
# there doesn't seem to be any docs here?
# https://docs.kraken.com/rest/#operation/cancelOrder
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = result.get('pending')
if pending:
log.error(f'Order {oid} cancel was not yet successful')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
else: # order cancel success case.
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
else:
log.error(f'Unknown order command: {request_msg}')
@acm
async def subscribe(
ws: wsproto.WSConnection,
token: str,
subs: list[str] = ['ownTrades', 'openOrders'],
):
'''
Setup ws api subscriptions:
https://docs.kraken.com/websockets/#message-subscribe
By default we sign up for trade and order update events.
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token
for sub in subs:
msg = {
'event': 'subscribe',
'subscription': {
'name': sub,
'token': token,
}
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(msg)
yield
for sub in subs:
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': [sub],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with get_client() as client:
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# await ctx.started(({}, ['paper']))
if not client._api_key:
raise RuntimeError(
'Missing Kraken API key in `brokers.toml`!?!?')
# auth required block
acctid = client._name
acc_name = 'kraken.' + acctid
# pull and deliver trades ledger
trades = await client.get_trades()
log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`'
)
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
position_msgs: list[dict] = []
pps: dict[int, pp.Position]
for pps in [active, closed]:
for tid, p in pps.items():
msg = BrokerdPosition(
broker='kraken',
account=acc_name,
symbol=p.symbol.front_fqsn(),
size=p.size,
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
await ctx.started(
(position_msgs, [acc_name])
)
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error')
if err:
raise BrokerError(err)
token = resp['result']['token']
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=partial(
subscribe,
token=token,
),
) as ws,
trio.open_nursery() as n,
):
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)
count: int = 0
# process and relay trades events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
async for msg in stream_messages(ws):
match msg:
case [
trades_msgs,
'ownTrades',
{'sequence': seq},
]:
# XXX: do we actually need this orrr?
# ensure that we are only processing new trades?
assert seq > count
count += 1
# flatten msgs for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():
# parse-cast
reqid = trade['ordertxid']
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# update ledger and position tracking
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit pp msgs
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
)
await ems_stream.send(pp_msg.dict())
case [
trades_msgs,
'openOrders',
{'sequence': seq},
]:
# TODO: async order update handling which we
# should remove from `handle_order_requests()`
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Order update {seq}:{trades_msgs}')
case _:
log.warning(f'Unhandled trades msg: {msg}')
await tractor.breakpoint()
def norm_trade_records(
ledger: dict[str, Any],
) -> list[pp.Transaction]:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = record.get('vol') * {
'buy': 1,
'sell': -1,
}[record['type']]
bsuid = record['pair']
norm_sym = normalize_symbol(bsuid)
records.append(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=float(size),
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
bsuid=bsuid,
# XXX: there are no derivs on kraken right?
# expiry=expiry,
)
)
return records
async def update_ledger(
acctid: str,
trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]:
# write recent session's trades to the user's (local) ledger file.
with pp.open_trade_ledger(
'kraken',
acctid,
) as ledger:
ledger.update(trade_entries)
# normalize to transaction form
records = norm_trade_records(trade_entries)
return records

View File

@ -0,0 +1,464 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Real-time and historical data feed endpoints.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from typing import (
Any,
Optional,
Callable,
)
import time
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from pydantic import BaseModel
from trio_typing import TaskStatus
import tractor
import trio
import wsproto
from piker._cacheables import open_cached_client
from piker.brokers._util import (
BrokerError,
DataThrottle,
DataUnavailable,
)
from piker.log import get_console_log
from piker.data import ShmArray
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
Client,
OHLC,
)
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(BaseModel):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
base: str # asset id of base component
aclass_quote: str # asset class of quote component
quote: str # asset id of quote component
lot: str # volume lot size
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
# amount to multiply lot volume by to get currency volume
lot_multiplier: float
# array of leverage amounts available when buying
leverage_buy: list[int]
# array of leverage amounts available when selling
leverage_sell: list[int]
# fee schedule array in [volume, percent fee] tuples
fees: list[tuple[int, float]]
# maker fee schedule array in [volume, percent fee] tuples (if on
# maker/taker)
fees_maker: list[tuple[int, float]]
fee_volume_currency: str # volume discount currency
margin_call: str # margin call level
margin_stop: str # stop-out/liquidation margin level
ordermin: float # minimum order volume for pair
async def stream_messages(
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0
while True:
with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()
# trigger reconnection if heartbeat is laggy
if cs.cancelled_caught:
too_slow_count += 1
if too_slow_count > 20:
log.warning(
"Heartbeat is too slow, resetting ws connection")
await ws._connect()
too_slow_count = 0
continue
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
now = time.time()
delay = now - last_hb
last_hb = now
# XXX: why tf is this not printing without --tl flag?
log.debug(f"Heartbeat after {delay}")
# print(f"Heartbeat after {delay}")
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
yield msg
async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
chan_id, *payload_array, chan_name, pair = msg
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
def normalize(
ohlc: OHLC,
) -> dict:
quote = asdict(ohlc)
quote['broker_ts'] = quote['time']
quote['brokerd_ts'] = time.time()
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
quote['last'] = quote['close']
quote['bar_wap'] = ohlc.vwap
# seriously eh? what's with this non-symmetry everywhere
# in subscription systems...
# XXX: piker style is always lowercases symbols.
topic = quote['pair'].replace('/', '').lower()
# print(quote)
return topic, quote
def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]:
'''
Create a request subscription packet dict.
https://docs.kraken.com/websockets/#message-subscribe
'''
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'pair': pairs,
'event': 'subscribe',
'subscription': data,
}
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# backend specific
sub_type: str = 'ohlc',
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
'''
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {}
sym_infos = {}
async with open_cached_client('kraken') as client, send_chan as send_chan:
# keep client cached for real-time section
for sym in symbols:
# transform to upper since piker style is always lower
sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname
symbol = symbols[0].lower()
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
symbol: {
'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
@acm
async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(ohlc_sub)
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
)
# pull a first quote and deliver
await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws:
# pull a first quote and deliver
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last)
task_status.started((init_msgs, quote))
# lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set()
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
# start streaming
async for typ, ohlc in msg_gen:
if typ == 'ohlc':
# TODO: can get rid of all this by using
# ``trades`` subscription...
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
# new OHLC sample interval
if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
ohlc_last = ohlc
last = ohlc.close
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
topic, quote = normalize(ohlc)
elif typ == 'l1':
quote = ohlc
topic = quote['symbol'].lower()
await send_chan.send({topic: quote})
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('kraken') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started(cache)
async with ctx.open_stream() as stream:
async for pattern in stream:
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send(
{item[0]['altname']: item[0]
for item in matches}
)

View File

@ -615,7 +615,7 @@ def load_pps_from_toml(
if not pps:
log.warning(
f'No trade history could be loaded for {brokername}:{acctid}'
f'No `pps.toml` positions could be loaded {brokername}:{acctid}'
)
# unmarshal/load ``pps.toml`` config entries into object form.
@ -752,11 +752,11 @@ def update_pps_conf(
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
print(f'Updating active pp: {fqsn}')
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.rstrip(f'.{brokername}')
brokerless_key = fqsn.removeprefix(f'{brokername}.')
pp_entries[brokerless_key] = asdict