diff --git a/config/brokers.toml b/config/brokers.toml index 7d288648..20216bde 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -8,8 +8,8 @@ expires_at = 1616095326.355846 [kraken] key_descr = "api_0" -public_key = "" -private_key = "" +api_key = "" +secret = "" [ib] host = "127.0.0.1" diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 0d899428..18475886 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -14,13 +14,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Kraken backend. -""" +''' from contextlib import asynccontextmanager from dataclasses import asdict, field -from typing import List, Dict, Any, Tuple, Optional +from typing import Dict, List, Tuple, Any, Optional, AsyncIterator import time from trio_typing import TaskStatus @@ -33,12 +33,25 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto +from itertools import count +import urllib.parse +import hashlib +import hmac +import base64 +from .. import config from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws +from ..data._web_bs import open_autorecon_ws, NoBsWs +from ..clearing._paper_engine import PaperBoi +from ..clearing._messages import ( + BrokerdPosition, BrokerdOrder, BrokerdStatus, + BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdFill, +) + log = get_logger(__name__) @@ -106,13 +119,27 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair +class Trade(BaseModel): + ''' + Trade class that helps parse and validate ownTrades stream + + ''' + reqid: str # kraken order transaction id + action: str # buy or sell + price: str # price of asset + size: str # vol of asset + broker_time: str # e.g GTC, GTD + + @dataclass class OHLC: - """Description of the flattened OHLC quote format. + ''' + Description of the flattened OHLC quote format. For schema details see: https://docs.kraken.com/websockets/#message-ohlc - """ + + ''' chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair @@ -129,9 +156,52 @@ class OHLC: ticks: List[Any] = field(default_factory=list) +def get_config() -> dict[str, Any]: + + conf, path = config.load() + + section = conf.get('kraken') + + if section is None: + log.warning(f'No config section found for kraken in {path}') + return {} + + return section + + +def get_kraken_signature( + urlpath: str, + data: Dict[str, Any], + secret: str +) -> str: + postdata = urllib.parse.urlencode(data) + encoded = (str(data['nonce']) + postdata).encode() + message = urlpath.encode() + hashlib.sha256(encoded).digest() + + mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512) + sigdigest = base64.b64encode(mac.digest()) + return sigdigest.decode() + + +class InvalidKey(ValueError): + ''' + EAPI:Invalid key + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one + and update your application. + + ''' + + class Client: - def __init__(self) -> None: + def __init__( + self, + name: str = '', + api_key: str = '', + secret: str = '' + ) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location = _url self._sesh.headers.update({ @@ -139,6 +209,9 @@ class Client: 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) self._pairs: list[str] = [] + self._name = name + self._api_key = api_key + self._secret = secret @property def pairs(self) -> Dict[str, Any]: @@ -162,6 +235,108 @@ class Client: ) return resproc(resp, log) + async def _private( + self, + method: str, + data: dict, + uri_path: str + ) -> Dict[str, Any]: + headers = { + 'Content-Type': + 'application/x-www-form-urlencoded', + 'API-Key': + self._api_key, + 'API-Sign': + get_kraken_signature(uri_path, data, self._secret) + } + resp = await self._sesh.post( + path=f'/private/{method}', + data=data, + headers=headers, + timeout=float('inf') + ) + return resproc(resp, log) + + async def endpoint( + self, + method: str, + data: Dict[str, Any] + ) -> Dict[str, Any]: + uri_path = f'/0/private/{method}' + data['nonce'] = str(int(1000*time.time())) + return await self._private(method, data, uri_path) + + async def get_trades( + self, + data: Dict[str, Any] = {} + ) -> Dict[str, Any]: + data['ofs'] = 0 + # Grab all trade history + # https://docs.kraken.com/rest/#operation/getTradeHistory + # Kraken uses 'ofs' to refer to the offset + while True: + resp = await self.endpoint('TradesHistory', data) + # grab the first 50 trades + if data['ofs'] == 0: + trades = resp['result']['trades'] + # load the next 50 trades using dict constructor + # for speed + elif data['ofs'] == 50: + trades = dict(trades, **resp['result']['trades']) + # catch the end of the trades + elif resp['result']['trades'] == {}: + count = resp['result']['count'] + break + # update existing dict if num trades exceeds 100 + else: + trades.update(resp['result']['trades']) + # increment the offset counter + data['ofs'] += 50 + # To avoid exceeding API rate limit in case of a lot of trades + await trio.sleep(1) + + # make sure you grabbed all the trades + assert count == len(trades.values()) + + return trades + + async def submit_limit( + self, + oid: str, + symbol: str, + price: float, + action: str, + size: float, + reqid: int = None, + ) -> int: + ''' + Place an order and return integer request id provided by client. + + ''' + # Build order data for kraken api + data = { + "userref": reqid, + "ordertype": "limit", + "type": action, + "volume": str(size), + "pair": symbol, + "price": str(price), + # set to True test AddOrder call without a real submission + "validate": False + } + return await self.endpoint('AddOrder', data) + + async def submit_cancel( + self, + reqid: str, + ) -> None: + ''' + Send cancel request for order id ``reqid``. + + ''' + # txid is a transaction id given by kraken + return await self.endpoint('CancelOrder', {"txid": reqid}) + async def symbol_info( self, pair: Optional[str] = None, @@ -271,9 +446,19 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') + @asynccontextmanager async def get_client() -> Client: - client = Client() + + section = get_config() + if section: + client = Client( + name=section['key_descr'], + api_key=section['api_key'], + secret=section['secret'] + ) + else: + client = Client() # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -281,8 +466,347 @@ async def get_client() -> Client: yield client -async def stream_messages(ws): +def pack_positions( + acc: str, + trades: dict +) -> list[Any]: + positions: dict[str, float] = {} + vols: dict[str, float] = {} + costs: dict[str, float] = {} + position_msgs: list[Any] = [] + for trade in trades.values(): + sign = -1 if trade['type'] == 'sell' else 1 + pair = trade['pair'] + vol = float(trade['vol']) + vols[pair] = vols.get(pair, 0) + sign * vol + costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) + positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 + + for ticker, pos in positions.items(): + vol = float(vols[ticker]) + if not vol: + continue + norm_sym = normalize_symbol(ticker) + msg = BrokerdPosition( + broker='kraken', + account=acc, + symbol=norm_sym, + currency=norm_sym[-3:], + size=vol, + avg_price=float(pos), + ) + position_msgs.append(msg.dict()) + + return position_msgs + + +def normalize_symbol( + ticker: str +) -> str: + # This is to convert symbol names from what kraken + # uses to the traditional 3x3 pair symbol syntax + symlen = len(ticker) + if symlen == 6: + return ticker.lower() + else: + for sym in ['XXBT', 'XXMR', 'ZEUR']: + if sym in ticker: + ticker = ticker.replace(sym, sym[1:]) + return ticker.lower() + + +def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: + ''' + Create a request subscription packet dict. + + ## TODO: point to the auth urls + https://docs.kraken.com/websockets/#message-subscribe + + ''' + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'event': 'subscribe', + 'subscription': data, + } + + +async def handle_order_requests( + + client: Client, + ems_order_stream: tractor.MsgStream, + +) -> None: + + request_msg: dict + order: BrokerdOrder + userref_counter = count() + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'Kraken only, No account found: `{account}` ?', + ).dict()) + continue + + # validate + temp_id = next(userref_counter) + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + resp = await client.submit_limit( + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + reqid=temp_id, + ) + + err = resp['error'] + if err: + log.error(f'Failed to submit order') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order submission", + broker_details=resp + ).dict() + ) + else: + # TODO: handle multiple cancels + # txid is an array of strings + reqid = resp['result']['txid'][0] + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + # account the made the order + account=order.account + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + # Send order cancellation to kraken + resp = await client.submit_cancel( + reqid=msg.reqid + ) + + try: + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 + + # TODO: Change this code using .get + try: + pending = resp['result']['pending'] + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + except KeyError: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + except AssertionError: + log.error(f'Order cancel was not successful') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order cancel", + broker_details=resp + ).dict() + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None, +) -> AsyncIterator[Dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection, token: str): + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + trades_sub = make_auth_sub( + {'name': 'ownTrades', 'token': token} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(trades_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': ['ownTrades'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # Authenticated block + async with get_client() as client: + if not client._api_key: + log.error('Missing Kraken API key: Trades WS connection failed') + await ctx.started(({}, {'paper',})) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + + client = PaperBoi( + 'kraken', + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + + # TODO: load paper positions from ``positions.toml`` + _positions={}, + ) + + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + acc_name = 'kraken.' + client._name + trades = await client.get_trades() + + position_msgs = pack_positions(acc_name, trades) + + await ctx.started((position_msgs, (acc_name,))) + + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) + + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(filled_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + + +async def stream_messages( + ws: NoBsWs, +): + ''' + Message stream parser and heartbeat handler. + + Deliver ws subscription messages as well as handle heartbeat logic + though a single async generator. + + ''' too_slow_count = last_hb = 0 while True: @@ -320,39 +844,95 @@ async def stream_messages(ws): if err: raise BrokerError(err) else: - chan_id, *payload_array, chan_name, pair = msg + yield msg - if 'ohlc' in chan_name: - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) +async def process_data_feed_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. - elif 'spread' in chan_name: + ''' + async for msg in stream_messages(ws): - bid, ask, ts, bsize, asize = map(float, payload_array[0]) + chan_id, *payload_array, chan_name, pair = msg - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + if 'ohlc' in chan_name: - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) + elif 'spread' in chan_name: - else: - print(f'UNHANDLED MSG: {msg}') + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') + yield msg + + +async def process_trade_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + sequence_counter = 0 + async for msg in stream_messages(ws): + + try: + # check that we are on the ownTrades stream and that msgs are + # arriving in sequence with kraken + # For clarification the kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades + assert msg[1] == 'ownTrades' + assert msg[2]['sequence'] > sequence_counter + sequence_counter += 1 + raw_msgs = msg[0] + trade_msgs = [] + + # Check that we are only processing new trades + if msg[2]['sequence'] != 1: + # check if its a new order or an update msg + for trade_msg in raw_msgs: + trade = list(trade_msg.values())[0] + order_msg = Trade( + reqid=trade['ordertxid'], + action=trade['type'], + price=trade['price'], + size=trade['vol'], + broker_time=trade['time'] + ) + trade_msgs.append(order_msg) + + yield trade_msgs + + except AssertionError: + print(f'UNHANDLED MSG: {msg}') + yield msg def normalize( ohlc: OHLC, + ) -> dict: quote = asdict(ohlc) quote['broker_ts'] = quote['time'] @@ -371,11 +951,12 @@ def normalize( def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: - """Create a request subscription packet dict. + ''' + Create a request subscription packet dict. https://docs.kraken.com/websockets/#message-subscribe - """ + ''' # eg. specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 return { @@ -393,8 +974,9 @@ async def backfill_bars( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: - """Fill historical bars into shared mem / storage afap. - """ + ''' + Fill historical bars into shared mem / storage afap. + ''' with trio.CancelScope() as cs: async with open_cached_client('kraken') as client: bars = await client.bars(symbol=sym) @@ -416,10 +998,12 @@ async def stream_quotes( task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """Subscribe for ohlc stream of quotes for ``pairs``. + ''' + Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. - """ + + ''' # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) @@ -491,15 +1075,16 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # see the tips on reonnection logic: + # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + ws: NoBsWs async with open_autorecon_ws( 'wss://ws.kraken.com/', fixture=subscribe, ) as ws: # pull a first quote and deliver - msg_gen = stream_messages(ws) + msg_gen = process_data_feed_msgs(ws) # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await msg_gen.__anext__() @@ -558,6 +1143,7 @@ async def stream_quotes( @tractor.context async def open_symbol_search( ctx: tractor.Context, + ) -> Client: async with open_cached_client('kraken') as client: diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ee1ad8ac..630405ea 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -493,7 +493,8 @@ async def open_brokerd_trades_dialogue( finally: # parent context must have been closed # remove from cache so next client will respawn if needed - _router.relays.pop(broker) + ## TODO: Maybe add a warning + _router.relays.pop(broker, None) @tractor.context diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 698a928f..f87e2203 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -389,7 +389,7 @@ async def handle_order_requests( account = request_msg['account'] if account != 'paper': log.error( - 'On a paper account, only a `paper` selection is valid' + 'This is a paper account, only a `paper` selection is valid' ) await ems_order_stream.send(BrokerdError( oid=request_msg['oid'], @@ -463,7 +463,8 @@ async def trades_dialogue( ): # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` - await ctx.started(({}, ['paper'])) + # await ctx.started(all_positions) + await ctx.started(({}, {'paper',})) async with ( ctx.open_stream() as ems_stream, diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index d2a15e06..820d5054 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -53,11 +53,13 @@ class NoBsWs: def __init__( self, url: str, + token: str, stack: AsyncExitStack, fixture: Callable, serializer: ModuleType = json, ): self.url = url + self.token = token self.fixture = fixture self._stack = stack self._ws: 'WebSocketConnection' = None # noqa @@ -81,9 +83,15 @@ class NoBsWs: trio_websocket.open_websocket_url(self.url) ) # rerun user code fixture - ret = await self._stack.enter_async_context( - self.fixture(self) - ) + if self.token == '': + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + else: + ret = await self._stack.enter_async_context( + self.fixture(self, self.token) + ) + assert ret is None log.info(f'Connection success: {self.url}') @@ -127,12 +135,14 @@ async def open_autorecon_ws( # TODO: proper type annot smh fixture: Callable, + # used for authenticated websockets + token: str = '', ) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em. """ async with AsyncExitStack() as stack: - ws = NoBsWs(url, stack, fixture=fixture) + ws = NoBsWs(url, token, stack, fixture=fixture) await ws._connect() try: