From b88dd380a36578a254b5e5c3d7d222da15ea2a8a Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Tue, 24 Aug 2021 16:51:59 -0400 Subject: [PATCH 01/29] get kraken authentication and retrieve balances --- piker/brokers/kraken.py | 75 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 0d899428..df2407c9 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -34,12 +34,19 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto +from . import config from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws +import urllib.parse +import hashlib +import hmac +import base64 + + log = get_logger(__name__) @@ -129,6 +136,28 @@ class OHLC: ticks: List[Any] = field(default_factory=list) +def get_kraken_signature( + urlpath: str, + data: Dict[str, Any], + secret: str +) -> str: + postdata = urllib.parse.urlencode(data) + encoded = (str(data['nonce']) + postdata).encode() + message = urlpath.encode() + hashlib.sha256(encoded).digest() + + mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512) + sigdigest = base64.b64encode(mac.digest()) + return sigdigest.decode() + + +class InvalidKey(ValueError): + """EAPI:Invalid key + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one + and update your application.""" + + class Client: def __init__(self) -> None: @@ -139,6 +168,8 @@ class Client: 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) self._pairs: list[str] = [] + self._api_key = '' + self._secret = '' @property def pairs(self) -> Dict[str, Any]: @@ -162,6 +193,41 @@ class Client: ) return resproc(resp, log) + async def _private( + self, + method: str, + data: dict, + uri_path: str, + ) -> Dict[str, Any]: + headers = { + 'Content-Type': + 'application/x-www-form-urlencoded', + 'API-Key': + self._api_key, + 'API-Sign': + get_kraken_signature(uri_path, data, self._secret) + } + resp = await self._sesh.post( + path=f'/private/{method}', + data=data, + headers=headers, + timeout=float('inf') + ) + return resproc(resp, log) + + async def get_balances( + self, + ) -> Dict[str, str]: + data = { + 'nonce' : str(int(1000*time.time())) + } + resp = await self._private('Balance', data, '/0/private/Balance') + err = resp['error'] + if err: + print(err) + + return resp['result'] + async def symbol_info( self, pair: Optional[str] = None, @@ -275,6 +341,15 @@ class Client: async def get_client() -> Client: client = Client() + conf, path = config.load() + section = conf.get('kraken') + client._api_key = section['api_key'] + client._secret = section['secret'] + + balances = await client.get_balances() + + await tractor.breakpoint() + # at startup, load all symbols locally for fast search await client.cache_symbols() From 184edb2a90b4133206f89b1489630cc9fdb29e80 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 25 Aug 2021 22:13:00 -0400 Subject: [PATCH 02/29] wrap api method calls with uri and nonce value --- piker/brokers/kraken.py | 45 ++++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index df2407c9..041e6041 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -197,7 +197,7 @@ class Client: self, method: str, data: dict, - uri_path: str, + uri_path: str ) -> Dict[str, Any]: headers = { 'Content-Type': @@ -215,17 +215,17 @@ class Client: ) return resproc(resp, log) - async def get_balances( + async def get_user_data( self, - ) -> Dict[str, str]: - data = { - 'nonce' : str(int(1000*time.time())) - } - resp = await self._private('Balance', data, '/0/private/Balance') + method: str, + data: Dict[str, Any] + ) -> Dict[str, Any]: + uri_path = f'/0/private/{method}' + data['nonce'] = str(int(1000*time.time())) + resp = await self._private(method, data, uri_path) err = resp['error'] if err: print(err) - return resp['result'] async def symbol_info( @@ -345,8 +345,12 @@ async def get_client() -> Client: section = conf.get('kraken') client._api_key = section['api_key'] client._secret = section['secret'] + data = { + # add non nonce vars + } - balances = await client.get_balances() + balances = await client.get_user_data('Balance', data) + ledgers = await client.get_user_data('Ledgers', data) await tractor.breakpoint() @@ -356,6 +360,29 @@ async def get_client() -> Client: yield client +# @tractor.context +# async def trades_dialogue( +# ctx: tractor.Context, +# loglevel: str = None, +# ) -> AsyncIterator[Dict[str, Any]]: +# +# # XXX: required to propagate ``tractor`` loglevel to piker logging +# get_console_log(loglevel or tractor.current_actor().loglevel) +# +# # deliver positions to subscriber before anything else +# positions = await _trio_run_client_method(method='positions') +# +# all_positions = {} +# +# for pos in positions: +# msg = pack_position(pos) +# all_positions[msg.symbol] = msg.dict() +# +# await ctx.started(all_positions) + + + + async def stream_messages(ws): too_slow_count = last_hb = 0 From e12af8aa4caf95b31b6b5c630d1c71bbd6e54ef5 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 26 Aug 2021 00:30:46 -0400 Subject: [PATCH 03/29] Add get_ledger function; parses raw ledger from kraken api --- piker/brokers/kraken.py | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 041e6041..1ae78d8d 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -45,6 +45,7 @@ import urllib.parse import hashlib import hmac import base64 +import pandas as pd log = get_logger(__name__) @@ -228,6 +229,40 @@ class Client: print(err) return resp['result'] + async def get_ledger( + self, + data: Dict[str, Any] + ) -> pd.DataFrame: + ledgers = await self.get_user_data('Ledgers', data) + num_entries = int(ledgers['count']) + crypto_transactions = np.empty((num_entries, 4), dtype=object) + if num_entries // 50 < 0 or num_entries == 50: + # NOTE: Omitting the following values from the kraken ledger: + # -> 'refid', 'type', 'subtype', 'aclass', 'balance' + for i, entry in enumerate(ledgers['ledger'].items()): + crypto_transactions[i] = [ + entry[1]['time'], + entry[1]['amount'], + entry[1]['fee'], + entry[1]['asset'] + ] + else: + for n in range(num_entries // 50 + 1): + data['ofs'] = n * 50 + ledgers = await self.get_user_data('Ledgers', data) + for i, entry in enumerate(ledgers['ledger'].items()): + crypto_transactions[i + n * 50] = [ + entry[1]['time'], + entry[1]['amount'], + entry[1]['fee'], + entry[1]['asset'] + ] + ledger = pd.DataFrame( + columns = ['time', 'amount', 'fee', 'asset'], + data = crypto_transactions + ) + return ledger + async def symbol_info( self, pair: Optional[str] = None, @@ -346,11 +381,11 @@ async def get_client() -> Client: client._api_key = section['api_key'] client._secret = section['secret'] data = { - # add non nonce vars + # add non-nonce and non-ofs vars } balances = await client.get_user_data('Balance', data) - ledgers = await client.get_user_data('Ledgers', data) + ledger = await client.get_ledger(data) await tractor.breakpoint() From 88061d87991e9ec4fe8c41b1d98b5ab893acc1cf Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Sat, 28 Aug 2021 19:31:43 -0400 Subject: [PATCH 04/29] Add balance to the ledger --- piker/brokers/kraken.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 1ae78d8d..2877c23e 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -235,16 +235,17 @@ class Client: ) -> pd.DataFrame: ledgers = await self.get_user_data('Ledgers', data) num_entries = int(ledgers['count']) - crypto_transactions = np.empty((num_entries, 4), dtype=object) + crypto_transactions = np.empty((num_entries, 5), dtype=object) if num_entries // 50 < 0 or num_entries == 50: # NOTE: Omitting the following values from the kraken ledger: - # -> 'refid', 'type', 'subtype', 'aclass', 'balance' + # -> 'refid', 'type', 'subtype', 'aclass' for i, entry in enumerate(ledgers['ledger'].items()): crypto_transactions[i] = [ entry[1]['time'], entry[1]['amount'], entry[1]['fee'], - entry[1]['asset'] + entry[1]['asset'], + entry[1]['balance'] ] else: for n in range(num_entries // 50 + 1): @@ -255,10 +256,11 @@ class Client: entry[1]['time'], entry[1]['amount'], entry[1]['fee'], - entry[1]['asset'] + entry[1]['asset'], + entry[1]['balance'] ] ledger = pd.DataFrame( - columns = ['time', 'amount', 'fee', 'asset'], + columns = ['time', 'amount', 'fee', 'asset', 'balance'], data = crypto_transactions ) return ledger @@ -386,6 +388,7 @@ async def get_client() -> Client: balances = await client.get_user_data('Balance', data) ledger = await client.get_ledger(data) + # positions await tractor.breakpoint() From 0285a847d87d2d4ae81226e82acb2f922414bcf2 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 6 Sep 2021 21:43:17 -0400 Subject: [PATCH 05/29] Store changes for rebase, positions prototype --- piker/brokers/kraken.py | 68 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 2877c23e..0436a57e 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -387,8 +387,76 @@ async def get_client() -> Client: } balances = await client.get_user_data('Balance', data) + traders = await client.get_user_data('TradesHistory', data) ledger = await client.get_ledger(data) + # positions + ## TODO: Make sure to add option with fees + n, m = ledger.shape + ledger['time'] = ledger['time'].apply(lambda x: int(x)) + assets = set(ledger['asset']) + trade_times = set(ledger['time']) + trades = {} + positions = {} + # for index, row in ledger.iterrows(): + # if index == n: + # break + # asset = row['asset'] + # ## TODO: Look into a way to generalize this + # if asset != 'ZEUR' and ledger.loc[index+1, 'asset'] == 'ZEUR': + # try: + # trades[asset]['amounts'].append(float(ledger.loc[index, 'amount'])) + # trades[asset]['prices'].append(float(ledger.loc[index+1, 'amount'])) + # except KeyError: + # trades[asset] = { + # 'amounts': [float(ledger.loc[index+1, 'amount'])], + # 'prices': [float(ledger.loc[index+1, 'amount'])] + # } + + ## TODO: Look into a way to generalize this for any fiat + ## TODO: Figure out how to handle coin for coin trades + for trade_time in trade_times: + trade = ledger[ledger['time'] == trade_time] + coin = trade[trade['asset'] != 'ZEUR'] + fiat = trade[trade['asset'] == 'ZEUR'] + if len(coin) == 0 or len(coin) > 1: + continue + asset = list(coin.loc[:, 'asset'])[0] + amount = np.sum(coin['amount'].apply(lambda x: float(x))) + if amount > 0: + sign = -1 + price = sign * np.sum(fiat['amount'].apply(lambda x: float(x))) + try: + trades[asset]['trade_amounts'].append(amount) + trades[asset]['trade_prices'].append(price) + trades[asset]['enter_amounts'].append(amount) + except KeyError: + trades[asset] = { + 'trade_amounts': [amount], + 'trade_prices': [price], + 'enter_amounts': [amount] + } + else: + price = 0 + # continue + try: + trades[asset]['trade_amounts'].append(amount) + trades[asset]['trade_prices'].append(price) + except KeyError: + trades[asset] = { + 'trade_amounts': [amount], + 'trade_prices': [price], + 'enter_amounts': [] + } + + + for asset in assets: + if asset == 'ZEUR': + continue + t_amounts = np.array(trades[asset]['trade_amounts']) + t_prices = np.array(trades[asset]['trade_prices']) + e_amounts = np.array(trades[asset]['enter_amounts']) + positions[asset] = np.dot(np.divide(t_prices, t_amounts), t_amounts) / np.sum(e_amounts) await tractor.breakpoint() From ef598444c42af18ca58acb332007a318832070fa Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 28 Oct 2021 15:52:02 -0400 Subject: [PATCH 06/29] get positions from trades --- piker/brokers/kraken.py | 185 +++++++++++++--------------------------- 1 file changed, 57 insertions(+), 128 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 0436a57e..3b729ae7 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -20,7 +20,7 @@ Kraken backend. """ from contextlib import asynccontextmanager from dataclasses import asdict, field -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple, Optional, AsyncIterator import time from trio_typing import TaskStatus @@ -34,12 +34,13 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from . import config +from .. import config from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws +from ..clearing._messages import BrokerdPosition import urllib.parse import hashlib @@ -229,41 +230,35 @@ class Client: print(err) return resp['result'] - async def get_ledger( + async def get_positions( self, - data: Dict[str, Any] - ) -> pd.DataFrame: - ledgers = await self.get_user_data('Ledgers', data) - num_entries = int(ledgers['count']) - crypto_transactions = np.empty((num_entries, 5), dtype=object) - if num_entries // 50 < 0 or num_entries == 50: - # NOTE: Omitting the following values from the kraken ledger: - # -> 'refid', 'type', 'subtype', 'aclass' - for i, entry in enumerate(ledgers['ledger'].items()): - crypto_transactions[i] = [ - entry[1]['time'], - entry[1]['amount'], - entry[1]['fee'], - entry[1]['asset'], - entry[1]['balance'] - ] - else: - for n in range(num_entries // 50 + 1): - data['ofs'] = n * 50 - ledgers = await self.get_user_data('Ledgers', data) - for i, entry in enumerate(ledgers['ledger'].items()): - crypto_transactions[i + n * 50] = [ - entry[1]['time'], - entry[1]['amount'], - entry[1]['fee'], - entry[1]['asset'], - entry[1]['balance'] - ] - ledger = pd.DataFrame( - columns = ['time', 'amount', 'fee', 'asset', 'balance'], - data = crypto_transactions - ) - return ledger + data: Dict[str, Any] = {} + ) -> Dict[str, Any]: + balances = await self.get_user_data('Balance', data) + ## TODO: grab all entries, not just first 50 + traders = await self.get_user_data('TradesHistory', data) + positions = {} + vols = {} + + # positions + ## TODO: Make sure to add option to include fees in positions calc + for trade in traders['trades'].values(): + sign = -1 if trade['type'] == 'sell' else 1 + try: + positions[trade['pair']] += sign * float(trade['cost']) + vols[trade['pair']] += sign * float(trade['vol']) + except KeyError: + positions[trade['pair']] = sign * float(trade['cost']) + vols[trade['pair']] = sign * float(trade['vol']) + + for pair in positions.keys(): + asset_balance = vols[pair] + if asset_balance == 0: + positions[pair] = 0 + else: + positions[pair] /= asset_balance + + return positions async def symbol_info( self, @@ -385,80 +380,9 @@ async def get_client() -> Client: data = { # add non-nonce and non-ofs vars } + # positions = await client.get_positions(data) - balances = await client.get_user_data('Balance', data) - traders = await client.get_user_data('TradesHistory', data) - ledger = await client.get_ledger(data) - - # positions - ## TODO: Make sure to add option with fees - n, m = ledger.shape - ledger['time'] = ledger['time'].apply(lambda x: int(x)) - assets = set(ledger['asset']) - trade_times = set(ledger['time']) - trades = {} - positions = {} - # for index, row in ledger.iterrows(): - # if index == n: - # break - # asset = row['asset'] - # ## TODO: Look into a way to generalize this - # if asset != 'ZEUR' and ledger.loc[index+1, 'asset'] == 'ZEUR': - # try: - # trades[asset]['amounts'].append(float(ledger.loc[index, 'amount'])) - # trades[asset]['prices'].append(float(ledger.loc[index+1, 'amount'])) - # except KeyError: - # trades[asset] = { - # 'amounts': [float(ledger.loc[index+1, 'amount'])], - # 'prices': [float(ledger.loc[index+1, 'amount'])] - # } - - ## TODO: Look into a way to generalize this for any fiat - ## TODO: Figure out how to handle coin for coin trades - for trade_time in trade_times: - trade = ledger[ledger['time'] == trade_time] - coin = trade[trade['asset'] != 'ZEUR'] - fiat = trade[trade['asset'] == 'ZEUR'] - if len(coin) == 0 or len(coin) > 1: - continue - asset = list(coin.loc[:, 'asset'])[0] - amount = np.sum(coin['amount'].apply(lambda x: float(x))) - if amount > 0: - sign = -1 - price = sign * np.sum(fiat['amount'].apply(lambda x: float(x))) - try: - trades[asset]['trade_amounts'].append(amount) - trades[asset]['trade_prices'].append(price) - trades[asset]['enter_amounts'].append(amount) - except KeyError: - trades[asset] = { - 'trade_amounts': [amount], - 'trade_prices': [price], - 'enter_amounts': [amount] - } - else: - price = 0 - # continue - try: - trades[asset]['trade_amounts'].append(amount) - trades[asset]['trade_prices'].append(price) - except KeyError: - trades[asset] = { - 'trade_amounts': [amount], - 'trade_prices': [price], - 'enter_amounts': [] - } - - - for asset in assets: - if asset == 'ZEUR': - continue - t_amounts = np.array(trades[asset]['trade_amounts']) - t_prices = np.array(trades[asset]['trade_prices']) - e_amounts = np.array(trades[asset]['enter_amounts']) - positions[asset] = np.dot(np.divide(t_prices, t_amounts), t_amounts) / np.sum(e_amounts) - - await tractor.breakpoint() + # await tractor.breakpoint() # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -466,27 +390,32 @@ async def get_client() -> Client: yield client -# @tractor.context -# async def trades_dialogue( -# ctx: tractor.Context, -# loglevel: str = None, -# ) -> AsyncIterator[Dict[str, Any]]: -# -# # XXX: required to propagate ``tractor`` loglevel to piker logging -# get_console_log(loglevel or tractor.current_actor().loglevel) -# -# # deliver positions to subscriber before anything else -# positions = await _trio_run_client_method(method='positions') -# -# all_positions = {} -# -# for pos in positions: -# msg = pack_position(pos) -# all_positions[msg.symbol] = msg.dict() -# -# await ctx.started(all_positions) +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None, +) -> AsyncIterator[Dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # deliver positions to subscriber before anything else + # positions = await _trio_run_client_method(method='positions') + + global _accounts2clients + + positions = await client.get_positions() + + await tractor.breakpoint() + all_positions = {} + + for pos in positions: + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() + + await ctx.started(all_positions) async def stream_messages(ws): From 48c7b5262c117059f8d2b538fd412c61aa52a5e2 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Fri, 29 Oct 2021 15:51:41 -0400 Subject: [PATCH 07/29] get positions working for kraken --- config/brokers.toml | 4 +- piker/brokers/kraken.py | 90 +++++++++++++++++++++++++-------- piker/clearing/_ems.py | 3 +- piker/clearing/_paper_engine.py | 5 +- 4 files changed, 76 insertions(+), 26 deletions(-) diff --git a/config/brokers.toml b/config/brokers.toml index 7d288648..20216bde 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -8,8 +8,8 @@ expires_at = 1616095326.355846 [kraken] key_descr = "api_0" -public_key = "" -private_key = "" +api_key = "" +secret = "" [ib] host = "127.0.0.1" diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3b729ae7..87a04ab4 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -138,6 +138,19 @@ class OHLC: ticks: List[Any] = field(default_factory=list) +def get_config() -> dict[str, Any]: + + conf, path = config.load() + + section = conf.get('kraken') + + if section is None: + log.warning(f'No config section found for kraken in {path}') + return {} + + return section + + def get_kraken_signature( urlpath: str, data: Dict[str, Any], @@ -170,6 +183,7 @@ class Client: 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) self._pairs: list[str] = [] + self._name = '' self._api_key = '' self._secret = '' @@ -258,7 +272,7 @@ class Client: else: positions[pair] /= asset_balance - return positions + return positions, vols async def symbol_info( self, @@ -373,16 +387,15 @@ class Client: async def get_client() -> Client: client = Client() - conf, path = config.load() - section = conf.get('kraken') + ## TODO: maybe add conditional based on section + section = get_config() + client._name = section['key_descr'] client._api_key = section['api_key'] client._secret = section['secret'] - data = { - # add non-nonce and non-ofs vars - } - # positions = await client.get_positions(data) - - # await tractor.breakpoint() + ## TODO: Add a client attribute to hold this info + #data = { + # # add non-nonce and non-ofs vars + #} # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -390,6 +403,36 @@ async def get_client() -> Client: yield client +def pack_position( + acc: str, + symkey: str, + pos: float, + vol: float +) -> dict[str, Any]: + + return BrokerdPosition( + broker='kraken', + account=acc, + symbol=symkey, + currency=symkey[-3:], + size=float(vol), + avg_price=float(pos), + ) + + +def normalize_symbol( + ticker: str +) -> str: + symlen = len(ticker) + if symlen == 6: + return ticker.lower() + else: + for sym in ['XXBT', 'XXMR', 'ZEUR']: + if sym in ticker: + ticker = ticker.replace(sym, sym[1:]) + return ticker.lower() + + @tractor.context async def trades_dialogue( ctx: tractor.Context, @@ -399,23 +442,28 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - # deliver positions to subscriber before anything else - # positions = await _trio_run_client_method(method='positions') + # Authenticated block + async with get_client() as client: + acc_name = 'kraken.' + client._name + positions, vols = await client.get_positions() - global _accounts2clients + all_positions = [] - positions = await client.get_positions() + for ticker, pos in positions.items(): + norm_sym = normalize_symbol(ticker) + if float(vols[ticker]) != 0: + msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) + all_positions.append(msg.dict()) + + #await tractor.breakpoint() - await tractor.breakpoint() + await ctx.started((all_positions, (acc_name,))) + await trio.sleep_forever() - all_positions = {} - - for pos in positions: - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() - - await ctx.started(all_positions) + # async with ( + # ctx.open_stream() as ems_stream, + # async def stream_messages(ws): diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ee1ad8ac..630405ea 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -493,7 +493,8 @@ async def open_brokerd_trades_dialogue( finally: # parent context must have been closed # remove from cache so next client will respawn if needed - _router.relays.pop(broker) + ## TODO: Maybe add a warning + _router.relays.pop(broker, None) @tractor.context diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 698a928f..f87e2203 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -389,7 +389,7 @@ async def handle_order_requests( account = request_msg['account'] if account != 'paper': log.error( - 'On a paper account, only a `paper` selection is valid' + 'This is a paper account, only a `paper` selection is valid' ) await ems_order_stream.send(BrokerdError( oid=request_msg['oid'], @@ -463,7 +463,8 @@ async def trades_dialogue( ): # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` - await ctx.started(({}, ['paper'])) + # await ctx.started(all_positions) + await ctx.started(({}, {'paper',})) async with ( ctx.open_stream() as ems_stream, From 3d2be3674e77c04e3c3d1056a183eb2a9c31fd0f Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 4 Nov 2021 14:34:43 -0400 Subject: [PATCH 08/29] save progress on kraken to test out unit_select_fixes --- piker/brokers/kraken.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 87a04ab4..957bacb8 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -40,7 +40,7 @@ from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws -from ..clearing._messages import BrokerdPosition +from ..clearing._messages import BrokerdPosition, BrokerdOrder, BrokerdStatus import urllib.parse import hashlib @@ -455,7 +455,8 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - #await tractor.breakpoint() + open_orders = await client.get_user_data('OpenOrders', {}) + await tractor.breakpoint() await ctx.started((all_positions, (acc_name,))) From 1fe1f88806a898d7f63cd6a1fb4b99657a2fa36a Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 26 Jan 2022 18:39:28 -0500 Subject: [PATCH 09/29] added the bones for the handle_order_requests func --- piker/brokers/kraken.py | 80 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 957bacb8..3f7b1eb0 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -40,7 +40,10 @@ from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws -from ..clearing._messages import BrokerdPosition, BrokerdOrder, BrokerdStatus +from ..clearing._messages import ( + BrokerdPosition, BrokerdOrder, BrokerdStatus, + BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill +) import urllib.parse import hashlib @@ -433,6 +436,74 @@ def normalize_symbol( return ticker.lower() +async def handle_order_requests( + + client: #kraken, + ems_order_stream: tractor.MsgStream, + +) -> None: + + # order_request: dict + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerError( + oid=request_msg['oid'] + symbol=request_msg['symbol'] + reason=f'Kraken only, No account found: `{account}` ?', + ).dict()) + continue + + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + ## TODO: look into the submit_limit method, do it write my own? + reqid = await client.submit_limit( + + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + ## XXX: how do I handle new orders + reqid=order.reqid, + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await client.submit_cancel( + reqid=msg.reqid + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + @tractor.context async def trades_dialogue( ctx: tractor.Context, @@ -462,9 +533,10 @@ async def trades_dialogue( await trio.sleep_forever() - # async with ( - # ctx.open_stream() as ems_stream, - # + # async with ( + # ctx.open_stream() as ems_stream, + # trio.open_nursery as n, + # ): async def stream_messages(ws): From b55debbe9589ebb3e5d4fc116874af0981eaf56b Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 7 Feb 2022 19:18:00 -0500 Subject: [PATCH 10/29] get basic order request loop receiving msgs --- piker/brokers/kraken.py | 104 ++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3f7b1eb0..a873575a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -438,7 +438,7 @@ def normalize_symbol( async def handle_order_requests( - client: #kraken, + client: 'test',#kraken, ems_order_stream: tractor.MsgStream, ) -> None: @@ -447,61 +447,61 @@ async def handle_order_requests( async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') - action = request_msg['action'] + # action = request_msg['action'] - if action in {'buy', 'sell'}: + # if action in {'buy', 'sell'}: - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerError( - oid=request_msg['oid'] - symbol=request_msg['symbol'] - reason=f'Kraken only, No account found: `{account}` ?', - ).dict()) - continue + # account = request_msg['account'] + # if account != 'kraken.spot': + # log.error( + # 'This is a kraken account, \ + # only a `kraken.spot` selection is valid' + # ) + # await ems_order_stream.send(BrokerError( + # oid=request_msg['oid'], + # symbol=request_msg['symbol'], + # reason=f'Kraken only, No account found: `{account}` ?', + # ).dict()) + # continue - # validate - order = BrokerdOrder(**request_msg) + # # validate + # order = BrokerdOrder(**request_msg) - # call our client api to submit the order - ## TODO: look into the submit_limit method, do it write my own? - reqid = await client.submit_limit( + # # call our client api to submit the order + # ## TODO: look into the submit_limit method, do it write my own? + # reqid = await client.submit_limit( - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - ## XXX: how do I handle new orders - reqid=order.reqid, - ) + # oid=order.oid, + # symbol=order.symbol, + # price=order.price, + # action=order.action, + # size=order.size, + # ## XXX: how do I handle new orders + # reqid=order.reqid, + # ) - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( + # # deliver ack that order has been submitted to broker routing + # await ems_order_stream.send( + # BrokerdOrderAck( - # ems order request id - oid=order.oid, + # # ems order request id + # oid=order.oid, - # broker specific request id - reqid=reqid, + # # broker specific request id + # reqid=reqid, - ).dict() - ) + # ).dict() + # ) - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) + # elif action == 'cancel': + # msg = BrokerdCancel(**request_msg) - await client.submit_cancel( - reqid=msg.reqid - ) + # await client.submit_cancel( + # reqid=msg.reqid + # ) - else: - log.error(f'Unknown order command: {request_msg}') + # else: + # log.error(f'Unknown order command: {request_msg}') @tractor.context @@ -527,17 +527,19 @@ async def trades_dialogue( all_positions.append(msg.dict()) open_orders = await client.get_user_data('OpenOrders', {}) - await tractor.breakpoint() + #await tractor.breakpoint() await ctx.started((all_positions, (acc_name,))) - await trio.sleep_forever() - - # async with ( - # ctx.open_stream() as ems_stream, - # trio.open_nursery as n, - # ): + #await trio.sleep_forever() + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + async def stream_messages(ws): From 66da58525df8d8d2341a9ff1a08c5126ff0fee5b Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 7 Feb 2022 20:13:24 -0500 Subject: [PATCH 11/29] mock orders validated from kraken --- piker/brokers/kraken.py | 120 ++++++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 47 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index a873575a..5462cefa 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -234,7 +234,7 @@ class Client: ) return resproc(resp, log) - async def get_user_data( + async def kraken_endpoint( self, method: str, data: Dict[str, Any] @@ -251,9 +251,9 @@ class Client: self, data: Dict[str, Any] = {} ) -> Dict[str, Any]: - balances = await self.get_user_data('Balance', data) + balances = await self.kraken_endpoint('Balance', data) ## TODO: grab all entries, not just first 50 - traders = await self.get_user_data('TradesHistory', data) + traders = await self.kraken_endpoint('TradesHistory', data) positions = {} vols = {} @@ -277,6 +277,33 @@ class Client: return positions, vols + async def submit_limit( + self, + oid: str, + symbol: str, + price: float, + action: str, + size: str, +# account: str, + reqid: int = None, + ) -> int: + """Place an order and return integer request id provided by client. + + """ + # Build order data from kraken + data = { + "userref": 1, + "ordertype": "limit", + "type": action, + "volume": size, + "pair": symbol, + "price": price, + "validate": True + } + resp = await self.kraken_endpoint('AddOrder', data) + print(resp) + return reqid + async def symbol_info( self, pair: Optional[str] = None, @@ -447,61 +474,60 @@ async def handle_order_requests( async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') - # action = request_msg['action'] + action = request_msg['action'] - # if action in {'buy', 'sell'}: + if action in {'buy', 'sell'}: - # account = request_msg['account'] - # if account != 'kraken.spot': - # log.error( - # 'This is a kraken account, \ - # only a `kraken.spot` selection is valid' - # ) - # await ems_order_stream.send(BrokerError( - # oid=request_msg['oid'], - # symbol=request_msg['symbol'], - # reason=f'Kraken only, No account found: `{account}` ?', - # ).dict()) - # continue + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'Kraken only, No account found: `{account}` ?', + ).dict()) + continue - # # validate - # order = BrokerdOrder(**request_msg) + # validate + order = BrokerdOrder(**request_msg) - # # call our client api to submit the order - # ## TODO: look into the submit_limit method, do it write my own? - # reqid = await client.submit_limit( + # call our client api to submit the order + reqid = await client.submit_limit( - # oid=order.oid, - # symbol=order.symbol, - # price=order.price, - # action=order.action, - # size=order.size, - # ## XXX: how do I handle new orders - # reqid=order.reqid, - # ) + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + ## XXX: how do I handle new orders + reqid=order.reqid, + ) - # # deliver ack that order has been submitted to broker routing - # await ems_order_stream.send( - # BrokerdOrderAck( + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( - # # ems order request id - # oid=order.oid, + # ems order request id + oid=order.oid, - # # broker specific request id - # reqid=reqid, + # broker specific request id + reqid=reqid, - # ).dict() - # ) + ).dict() + ) - # elif action == 'cancel': - # msg = BrokerdCancel(**request_msg) + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) - # await client.submit_cancel( - # reqid=msg.reqid - # ) + await client.submit_cancel( + reqid=msg.reqid + ) - # else: - # log.error(f'Unknown order command: {request_msg}') + else: + log.error(f'Unknown order command: {request_msg}') @tractor.context @@ -526,7 +552,7 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - open_orders = await client.get_user_data('OpenOrders', {}) + open_orders = await client.kraken_endpoint('OpenOrders', {}) #await tractor.breakpoint() await ctx.started((all_positions, (acc_name,))) From b21bbf50318d3ca7e8447746f22ba9940b80b3e4 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Tue, 8 Feb 2022 13:50:39 -0500 Subject: [PATCH 12/29] valdiate and ack order requests from ems --- piker/brokers/kraken.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 5462cefa..754dc0bb 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -33,6 +33,7 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto +from itertools import count from .. import config from .._cacheables import open_cached_client @@ -290,9 +291,9 @@ class Client: """Place an order and return integer request id provided by client. """ - # Build order data from kraken + # Build order data for kraken api data = { - "userref": 1, + "userref": reqid, "ordertype": "limit", "type": action, "volume": size, @@ -465,12 +466,14 @@ def normalize_symbol( async def handle_order_requests( - client: 'test',#kraken, + client: Client, ems_order_stream: tractor.MsgStream, ) -> None: - # order_request: dict + request_msg: dict + order: BrokerdOrder + userref_counter = count() async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') @@ -484,7 +487,7 @@ async def handle_order_requests( 'This is a kraken account, \ only a `kraken.spot` selection is valid' ) - await ems_order_stream.send(BrokerError( + await ems_order_stream.send(BrokerdError( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'Kraken only, No account found: `{account}` ?', @@ -503,7 +506,7 @@ async def handle_order_requests( action=order.action, size=order.size, ## XXX: how do I handle new orders - reqid=order.reqid, + reqid=next(userref_counter), ) # deliver ack that order has been submitted to broker routing From 96dd5c632f3967645e98f97bd43c16e875a5ddc0 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 9 Feb 2022 13:50:42 -0500 Subject: [PATCH 13/29] basic order submission and cancelling with kraken --- piker/brokers/kraken.py | 75 +++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 754dc0bb..90bdbaac 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -243,18 +243,17 @@ class Client: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) resp = await self._private(method, data, uri_path) - err = resp['error'] - if err: - print(err) - return resp['result'] + return resp async def get_positions( self, data: Dict[str, Any] = {} ) -> Dict[str, Any]: - balances = await self.kraken_endpoint('Balance', data) + resp = await self.kraken_endpoint('Balance', data) + balances = resp['result'] ## TODO: grab all entries, not just first 50 - traders = await self.kraken_endpoint('TradesHistory', data) + resp = await self.kraken_endpoint('TradesHistory', data) + traders = resp['result'] positions = {} vols = {} @@ -284,7 +283,7 @@ class Client: symbol: str, price: float, action: str, - size: str, + size: float, # account: str, reqid: int = None, ) -> int: @@ -296,14 +295,26 @@ class Client: "userref": reqid, "ordertype": "limit", "type": action, - "volume": size, + "volume": str(size), "pair": symbol, - "price": price, - "validate": True + "price": str(price), + # set to True test AddOrder call without a real submission + "validate": False } resp = await self.kraken_endpoint('AddOrder', data) + return resp + + async def submit_cancel( + self, + reqid: str, + ) -> None: + """Send cancel request for order id ``reqid``. + + """ + # txid is a transaction id given by kraken + data = {"txid": reqid} + resp = await self.kraken_endpoint('CancelOrder', data) print(resp) - return reqid async def symbol_info( self, @@ -495,10 +506,11 @@ async def handle_order_requests( continue # validate + temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) # call our client api to submit the order - reqid = await client.submit_limit( + resp = await client.submit_limit( oid=order.oid, symbol=order.symbol, @@ -506,21 +518,40 @@ async def handle_order_requests( action=order.action, size=order.size, ## XXX: how do I handle new orders - reqid=next(userref_counter), + reqid=temp_id, ) - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( + err = resp['error'] + if err: + log.error(f'Failed to submit order') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order submission", + broker_details=resp + ).dict() + ) + else: + ## TODO: handle multiple cancels + ## txid is an array of strings + reqid = resp['result']['txid'][0] + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( - # ems order request id - oid=order.oid, + # ems order request id + oid=order.oid, - # broker specific request id - reqid=reqid, + # broker specific request id + reqid=reqid, - ).dict() - ) + # account the made the order + account=order.account + + ).dict() + ) elif action == 'cancel': msg = BrokerdCancel(**request_msg) From 03d2eddce341123973e2f814958273c3a695aa92 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 9 Feb 2022 14:47:29 -0500 Subject: [PATCH 14/29] order submission and cancellation working --- piker/brokers/kraken.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 90bdbaac..395d24f1 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -314,7 +314,7 @@ class Client: # txid is a transaction id given by kraken data = {"txid": reqid} resp = await self.kraken_endpoint('CancelOrder', data) - print(resp) + return resp async def symbol_info( self, @@ -554,12 +554,33 @@ async def handle_order_requests( ) elif action == 'cancel': - msg = BrokerdCancel(**request_msg) + msg = BrokerdCancel(**request_msg) - await client.submit_cancel( + resp = await client.submit_cancel( reqid=msg.reqid ) + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 + + try: + pending = resp['result']['pending'] + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + except KeyError: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + else: log.error(f'Unknown order command: {request_msg}') From 0c905920e211ecf77bc7e2ede0d4dbde2186afef Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 9 Feb 2022 21:30:39 -0500 Subject: [PATCH 15/29] connect to krakens openOrders websocket --- piker/brokers/kraken.py | 176 +++++++++++++++++++++++++++++++++++----- piker/data/_web_bs.py | 18 +++- 2 files changed, 168 insertions(+), 26 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 395d24f1..2098d25e 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -475,6 +475,21 @@ def normalize_symbol( return ticker.lower() +def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: + """Create a request subscription packet dict. + + ## TODO: point to the auth urls + https://docs.kraken.com/websockets/#message-subscribe + + """ + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'event': 'subscribe', + 'subscription': data, + } + + async def handle_order_requests( client: Client, @@ -556,30 +571,35 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) + # Send order cancellation to kraken resp = await client.submit_cancel( reqid=msg.reqid ) - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled - assert resp['error'] == [] - assert resp['result']['count'] == 1 - try: - pending = resp['result']['pending'] - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - except KeyError: - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 + + ## TODO: Change this code using .get + try: + pending = resp['result']['pending'] + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + except KeyError: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + except AssertionError: + log.error(f'Order cancel was not successful') else: log.error(f'Unknown order command: {request_msg}') @@ -594,6 +614,46 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # Generate + + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection, token: str): + ## TODO: Fix docs and points to right urls + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + trades_sub = make_auth_sub( + {'name': 'openOrders', 'token': token} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(trades_sub) + + ## trade data (aka L1) + #l1_sub = make_sub( + # list(ws_pairs.values()), + # {'name': 'spread'} # 'depth': 10} + #) + + ## pull a first quote and deliver + #await ws.send_msg(l1_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': ['openOrders'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # Authenticated block async with get_client() as client: acc_name = 'kraken.' + client._name @@ -606,7 +666,7 @@ async def trades_dialogue( if float(vols[ticker]) != 0: msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - + open_orders = await client.kraken_endpoint('OpenOrders', {}) #await tractor.breakpoint() @@ -614,13 +674,85 @@ async def trades_dialogue( #await trio.sleep_forever() + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.kraken_endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] + async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) - + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + + while True: + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + print(msg) + + ## pull a first quote and deliver + #msg_gen = stream_messages(ws) + + ## TODO: use ``anext()`` when it lands in 3.10! + #typ, ohlc_last = await msg_gen.__anext__() + + #topic, quote = normalize(ohlc_last) + + #first_quote = {topic: quote} + #task_status.started((init_msgs, first_quote)) + + ## lol, only "closes" when they're margin squeezing clients ;P + #feed_is_live.set() + + ## keep start of last interval for volume tracking + #last_interval_start = ohlc_last.etime + + ## start streaming + #async for typ, ohlc in msg_gen: + + # if typ == 'ohlc': + + # # TODO: can get rid of all this by using + # # ``trades`` subscription... + + # # generate tick values to match time & sales pane: + # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + # volume = ohlc.volume + + # # new OHLC sample interval + # if ohlc.etime > last_interval_start: + # last_interval_start = ohlc.etime + # tick_volume = volume + + # else: + # # this is the tick volume *within the interval* + # tick_volume = volume - ohlc_last.volume + + # ohlc_last = ohlc + # last = ohlc.close + + # if tick_volume: + # ohlc.ticks.append({ + # 'type': 'trade', + # 'price': last, + # 'size': tick_volume, + # }) + + # topic, quote = normalize(ohlc) + + # elif typ == 'l1': + # quote = ohlc + # topic = quote['symbol'].lower() + + # await send_chan.send({topic: quote}) + async def stream_messages(ws): @@ -832,7 +964,7 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # see the tips on reonnection logic: + # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds async with open_autorecon_ws( 'wss://ws.kraken.com/', diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index d2a15e06..820d5054 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -53,11 +53,13 @@ class NoBsWs: def __init__( self, url: str, + token: str, stack: AsyncExitStack, fixture: Callable, serializer: ModuleType = json, ): self.url = url + self.token = token self.fixture = fixture self._stack = stack self._ws: 'WebSocketConnection' = None # noqa @@ -81,9 +83,15 @@ class NoBsWs: trio_websocket.open_websocket_url(self.url) ) # rerun user code fixture - ret = await self._stack.enter_async_context( - self.fixture(self) - ) + if self.token == '': + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + else: + ret = await self._stack.enter_async_context( + self.fixture(self, self.token) + ) + assert ret is None log.info(f'Connection success: {self.url}') @@ -127,12 +135,14 @@ async def open_autorecon_ws( # TODO: proper type annot smh fixture: Callable, + # used for authenticated websockets + token: str = '', ) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em. """ async with AsyncExitStack() as stack: - ws = NoBsWs(url, stack, fixture=fixture) + ws = NoBsWs(url, token, stack, fixture=fixture) await ws._connect() try: From 0122669dd4b19ba24f9fd47858d0d9a8eaa0a5bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Feb 2022 11:20:13 -0500 Subject: [PATCH 16/29] Factor out ws msg hearbeat and error handling Move the core ws message handling into `stream_messages()` and call that from 2 new stream processors: `process_data_feed_msgs()` and `process_order_msgs()`. Add comments for hints on how to implement the order msg parsing as well as `pprint` received msgs to console for now. --- piker/brokers/kraken.py | 172 +++++++++++++++++++--------------------- 1 file changed, 81 insertions(+), 91 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 2098d25e..25fd24f5 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -40,17 +40,17 @@ from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws +from ..data._web_bs import open_autorecon_ws, NoBsWs from ..clearing._messages import ( BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill + BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdFill, ) import urllib.parse import hashlib import hmac import base64 -import pandas as pd log = get_logger(__name__) @@ -157,7 +157,7 @@ def get_config() -> dict[str, Any]: def get_kraken_signature( urlpath: str, - data: Dict[str, Any], + data: Dict[str, Any], secret: str ) -> str: postdata = urllib.parse.urlencode(data) @@ -171,9 +171,9 @@ def get_kraken_signature( class InvalidKey(ValueError): """EAPI:Invalid key - This error is returned when the API key used for the call is - either expired or disabled, please review the API key in your - Settings -> API tab of account management or generate a new one + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one and update your application.""" @@ -686,76 +686,27 @@ async def trades_dialogue( ): ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: - - while True: - with trio.move_on_after(5) as cs: - msg = await ws.recv_msg() - print(msg) - - ## pull a first quote and deliver - #msg_gen = stream_messages(ws) - - ## TODO: use ``anext()`` when it lands in 3.10! - #typ, ohlc_last = await msg_gen.__anext__() - - #topic, quote = normalize(ohlc_last) - - #first_quote = {topic: quote} - #task_status.started((init_msgs, first_quote)) - - ## lol, only "closes" when they're margin squeezing clients ;P - #feed_is_live.set() - - ## keep start of last interval for volume tracking - #last_interval_start = ohlc_last.etime - - ## start streaming - #async for typ, ohlc in msg_gen: - - # if typ == 'ohlc': - - # # TODO: can get rid of all this by using - # # ``trades`` subscription... - - # # generate tick values to match time & sales pane: - # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - # volume = ohlc.volume - - # # new OHLC sample interval - # if ohlc.etime > last_interval_start: - # last_interval_start = ohlc.etime - # tick_volume = volume - - # else: - # # this is the tick volume *within the interval* - # tick_volume = volume - ohlc_last.volume - - # ohlc_last = ohlc - # last = ohlc.close - - # if tick_volume: - # ohlc.ticks.append({ - # 'type': 'trade', - # 'price': last, - # 'size': tick_volume, - # }) - - # topic, quote = normalize(ohlc) - - # elif typ == 'l1': - # quote = ohlc - # topic = quote['symbol'].lower() - - # await send_chan.send({topic: quote}) + from pprint import pprint + async for msg in process_order_msgs(ws): + pprint(msg) -async def stream_messages(ws): +async def stream_messages( + ws: NoBsWs, +): + ''' + Message stream parser and heartbeat handler. + Deliver ws subscription messages as well as handle heartbeat logic + though a single async generator. + + ''' too_slow_count = last_hb = 0 while True: @@ -793,39 +744,76 @@ async def stream_messages(ws): if err: raise BrokerError(err) else: - chan_id, *payload_array, chan_name, pair = msg + yield msg - if 'ohlc' in chan_name: - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) +async def process_data_feed_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. - elif 'spread' in chan_name: + ''' + async for msg in stream_messages(ws): - bid, ask, ts, bsize, asize = map(float, payload_array[0]) + chan_id, *payload_array, chan_name, pair = msg - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + if 'ohlc' in chan_name: - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) + elif 'spread' in chan_name: - else: - print(f'UNHANDLED MSG: {msg}') + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') + yield msg + + +async def process_order_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + async for msg in stream_messages(ws): + + # TODO: write your order event parser here! + # HINT: create a ``pydantic.BaseModel`` to parse and validate + # and then in the caller recast to our native ``BrokerdX`` msg types. + + # form of order msgs: + # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', + # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': + # '0.00000000', 'fee': '0.00000000', 'avg_price': + # '0.00000000', 'userref': 1, 'cancel_reason': 'User + # requested'}}], 'openOrders', {'sequence': 4}] + + yield msg def normalize( ohlc: OHLC, + ) -> dict: quote = asdict(ohlc) quote['broker_ts'] = quote['time'] @@ -966,13 +954,14 @@ async def stream_quotes( # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + ws: NoBsWs async with open_autorecon_ws( 'wss://ws.kraken.com/', fixture=subscribe, ) as ws: # pull a first quote and deliver - msg_gen = stream_messages(ws) + msg_gen = process_data_feed_msgs(ws) # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await msg_gen.__anext__() @@ -1031,6 +1020,7 @@ async def stream_quotes( @tractor.context async def open_symbol_search( ctx: tractor.Context, + ) -> Client: async with open_cached_client('kraken') as client: From 6c54c81f01cf92324e496692f3bf1d204b0d523b Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 14 Feb 2022 17:02:37 -0500 Subject: [PATCH 17/29] get stashed changes --- piker/brokers/kraken.py | 144 +++++++++++++++++++++++++++++----------- 1 file changed, 106 insertions(+), 38 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 25fd24f5..366d476f 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -119,6 +119,19 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair +class Order(BaseModel): + """Order class that helps parse and validate order stream""" + txid: str # kraken order transaction id + action: str # buy or sell + ordertype: str # limit order ##TODO: Do I need this? + pair: str # order pair + price: str # price of asset + vol: str # vol of asset + status: str # order status + opentm: str # time of order + timeinforce: str # e.g GTC, GTD + + @dataclass class OHLC: """Description of the flattened OHLC quote format. @@ -494,6 +507,8 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, + ws: NoBsWs, + token: str, ) -> None: @@ -523,50 +538,65 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) + def slashinsert(str): + midPoint = len(str)//2 + return str[:midPoint] + '/' + str[midPoint:] + # Send order via websocket + order_msg = { + "event": "addOrder", + "ordertype": "limit", + "pair": slashinsert(order.symbol.upper()), + "price": str(order.price), + "token": token, + "type": order.action, + "volume": str(order.size) + } + + + await ws.send_msg(order_msg) # call our client api to submit the order - resp = await client.submit_limit( + #resp = await client.submit_limit( + # oid=order.oid, + # symbol=order.symbol, + # price=order.price, + # action=order.action, + # size=order.size, + # ## XXX: how do I handle new orders + # reqid=temp_id, + #) - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - ## XXX: how do I handle new orders - reqid=temp_id, - ) + #err = resp['error'] + #if err: + # log.error(f'Failed to submit order') + # await ems_order_stream.send( + # BrokerdError( + # oid=order.oid, + # reqid=temp_id, + # symbol=order.symbol, + # reason="Failed order submission", + # broker_details=resp + # ).dict() + # ) + #else: + # ## TODO: handle multiple cancels + # ## txid is an array of strings + # reqid = resp['result']['txid'][0] + # # deliver ack that order has been submitted to broker routing + # await ems_order_stream.send( + # BrokerdOrderAck( - err = resp['error'] - if err: - log.error(f'Failed to submit order') - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=temp_id, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ).dict() - ) - else: - ## TODO: handle multiple cancels - ## txid is an array of strings - reqid = resp['result']['txid'][0] - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( + # # ems order request id + # oid=order.oid, - # ems order request id - oid=order.oid, + # # broker specific request id + # reqid=reqid, - # broker specific request id - reqid=reqid, + # # account the made the order + # account=order.account - # account the made the order - account=order.account - - ).dict() - ) + # ).dict() + # ) elif action == 'cancel': msg = BrokerdCancel(**request_msg) @@ -685,16 +715,31 @@ async def trades_dialogue( trio.open_nursery() as n, ): ## TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) + #n.start_soon(handle_order_requests, client, ems_stream) async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: + n.start_soon(handle_order_requests, client, ems_stream, ws, token) from pprint import pprint async for msg in process_order_msgs(ws): pprint(msg) + #await ems_order_stream.send( + # BrokerdOrderAck( + + # # ems order request id + # oid=order.oid, + + # # broker specific request id + # reqid=reqid, + + # # account the made the order + # account=order.account + + # ).dict() + #) async def stream_messages( @@ -801,6 +846,29 @@ async def process_order_msgs( # HINT: create a ``pydantic.BaseModel`` to parse and validate # and then in the caller recast to our native ``BrokerdX`` msg types. + # check that we are on openOrders stream XXX: Maybe do something with sequence + assert msg[1] == 'openOrders' + orders = msg[0] + + for order in orders: + txid = list(order.keys())[0] + order_msg = Order( + txid=txid, + action=order[txid]['descr']['type'], + ordertype=order[txid]['descr']['ordertype'], + pair=order[txid]['descr']['pair'], + price=order[txid]['descr']['price'], + vol=order[txid]['vol'], + status=order[txid]['status'], + opentm=order[txid]['opentm'], + timeinforce=order[txid]['timeinforce'] + ) + print(order_msg) + + + + + print(msg[0][0].keys()) # form of order msgs: # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': From d826a66c8c6cdc84793e0afaaaa13ff176efc80c Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 17 Feb 2022 16:59:50 -0500 Subject: [PATCH 18/29] use a mapping from userref to oid for order ack --- piker/brokers/kraken.py | 156 +++++++++++++++++++++++++++++++--------- 1 file changed, 122 insertions(+), 34 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 366d476f..cee39fdb 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -130,6 +130,7 @@ class Order(BaseModel): status: str # order status opentm: str # time of order timeinforce: str # e.g GTC, GTD + userref: str # for a mapping to oids @dataclass @@ -509,6 +510,7 @@ async def handle_order_requests( ems_order_stream: tractor.MsgStream, ws: NoBsWs, token: str, + userref_oid_map: dict, ) -> None: @@ -541,17 +543,23 @@ async def handle_order_requests( def slashinsert(str): midPoint = len(str)//2 return str[:midPoint] + '/' + str[midPoint:] + # Send order via websocket order_msg = { - "event": "addOrder", - "ordertype": "limit", - "pair": slashinsert(order.symbol.upper()), - "price": str(order.price), - "token": token, - "type": order.action, - "volume": str(order.size) - } - + "event": "addOrder", + "ordertype": "limit", + "pair": slashinsert(order.symbol.upper()), + "price": str(order.price), + "token": token, + "type": order.action, + "volume": str(order.size), + "userref": str(temp_id) + } + + # add oid userref mapping + userref_oid_map[str(temp_id)] = { + 'oid': order.oid, 'account': order.account + } await ws.send_msg(order_msg) @@ -716,16 +724,70 @@ async def trades_dialogue( ): ## TODO: maybe add multiple accounts #n.start_soon(handle_order_requests, client, ems_stream) + + # Mapping from userref passed to kraken and oid from piker + userref_oid_map = {} async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: - n.start_soon(handle_order_requests, client, ems_stream, ws, token) + n.start_soon( + handle_order_requests, + client, + ems_stream, + ws, + token, + userref_oid_map + ) from pprint import pprint + pending_orders = [] async for msg in process_order_msgs(ws): pprint(msg) + for order in msg: + ## TODO: Maybe do a better + if type(order) == dict: + for pending_order in pending_orders: + if pending_order.txid == order['txid'] and order['status'] == 'open': + await ems_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=userref_oid_map[pending_order.userref]['oid'], + + # broker specific request id + reqid=order['txid'], + + # account the made the order + account=userref_oid_map[ + pending_order.userref + ]['account'] + + ).dict() + ) + + elif order.status == 'pending': + pending_orders.append(order) + + + #if not pending_oder and order.status == 'open': + # await ems_stream.send( + # BrokerdOrder( + # action=order.action, + # oid='', + # ## TODO: support multi accounts? + # account='kraken.spot', + # time_ns=int(float(order.opentm) * 10**9), + # reqid=order.txid, + # symbol=order.pair.replace('/', '').lower(),# + \ + # #'.kraken', + # price=float(order.price), + # size=float(order.vol) + # ).dict() + # ) + + #await ems_order_stream.send( # BrokerdOrderAck( @@ -840,35 +902,61 @@ async def process_order_msgs( Parse and pack data feed messages. ''' + sequence_counter = 0 async for msg in stream_messages(ws): # TODO: write your order event parser here! # HINT: create a ``pydantic.BaseModel`` to parse and validate # and then in the caller recast to our native ``BrokerdX`` msg types. - - # check that we are on openOrders stream XXX: Maybe do something with sequence - assert msg[1] == 'openOrders' - orders = msg[0] - - for order in orders: - txid = list(order.keys())[0] - order_msg = Order( - txid=txid, - action=order[txid]['descr']['type'], - ordertype=order[txid]['descr']['ordertype'], - pair=order[txid]['descr']['pair'], - price=order[txid]['descr']['price'], - vol=order[txid]['vol'], - status=order[txid]['status'], - opentm=order[txid]['opentm'], - timeinforce=order[txid]['timeinforce'] - ) - print(order_msg) - - - - print(msg[0][0].keys()) + try: + # check that we are on openOrders stream and that msgs are arriving + # in sequence with kraken + assert msg[1] == 'openOrders' + assert msg[2]['sequence'] > sequence_counter + sequence_counter += 1 + raw_msgs = msg[0] + # TODO: get length and start list + order_msgs = [] + + try: + # check if its a new order or an update msg + for order in raw_msgs: + txid = list(order.keys())[0] + order_msg = Order( + txid=txid, + action=order[txid]['descr']['type'], + ordertype=order[txid]['descr']['ordertype'], + pair=order[txid]['descr']['pair'], + price=order[txid]['descr']['price'], + vol=order[txid]['vol'], + status=order[txid]['status'], + opentm=order[txid]['opentm'], + timeinforce=order[txid]['timeinforce'], + userref=order[txid]['userref'] + ) + order_msgs.append(order_msg) + + yield order_msgs + + except KeyError: + for order in raw_msgs: + txid = list(order.keys())[0] + ## TODO: maybe use a pydantic.BaseModel + order_msg = { + 'txid': txid, + 'status': order[txid]['status'], + 'userref': order[txid]['userref'] + } + order_msgs.append(order_msg) + + yield order_msgs + + + except AssertionError: + print(f'UNHANDLED MSG: {msg}') + yield msg + # form of order msgs: # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': @@ -876,7 +964,7 @@ async def process_order_msgs( # '0.00000000', 'userref': 1, 'cancel_reason': 'User # requested'}}], 'openOrders', {'sequence': 4}] - yield msg + # yield msg def normalize( From 46948e0a8b53b171d06c1283eed8ae752a387604 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 17 Feb 2022 17:40:28 -0500 Subject: [PATCH 19/29] add order cancel support over websockets --- piker/brokers/kraken.py | 75 ++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 28 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index cee39fdb..ba041d06 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -609,35 +609,43 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) + cancel_msg = { + "event": "cancelOrder", + "token": token, + "txid": [msg.reqid] + } - try: - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled - assert resp['error'] == [] - assert resp['result']['count'] == 1 + await ws.send_msg(cancel_msg) - ## TODO: Change this code using .get - try: - pending = resp['result']['pending'] - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - except KeyError: - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - except AssertionError: - log.error(f'Order cancel was not successful') + ## Send order cancellation to kraken + #resp = await client.submit_cancel( + # reqid=msg.reqid + #) + + #try: + # # Check to make sure there was no error returned by + # # the kraken endpoint. Assert one order was cancelled + # assert resp['error'] == [] + # assert resp['result']['count'] == 1 + + # ## TODO: Change this code using .get + # try: + # pending = resp['result']['pending'] + # # Check to make sure the cancellation is NOT pending, + # # then send the confirmation to the ems order stream + # except KeyError: + # await ems_order_stream.send( + # BrokerdStatus( + # reqid=msg.reqid, + # account=msg.account, + # time_ns=time.time_ns(), + # status='cancelled', + # reason='Order cancelled', + # broker_details={'name': 'kraken'} + # ).dict() + # ) + #except AssertionError: + # log.error(f'Order cancel was not successful') else: log.error(f'Unknown order command: {request_msg}') @@ -746,8 +754,19 @@ async def trades_dialogue( async for msg in process_order_msgs(ws): pprint(msg) for order in msg: - ## TODO: Maybe do a better + ## TODO: Maybe do a better check and handle accounts if type(order) == dict: + if order['status'] == 'canceled': + await ems_stream.send( + BrokerdStatus( + account='kraken.spot', + reqid=order['txid'], + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) for pending_order in pending_orders: if pending_order.txid == order['txid'] and order['status'] == 'open': await ems_stream.send( From b1bff1be8550b1c2f26776183c6cfcc2b0c36f4b Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Fri, 18 Feb 2022 22:24:33 -0500 Subject: [PATCH 20/29] remove ws support for orders, use rest api instead for easy oid association --- piker/brokers/kraken.py | 267 ++++++++++++++-------------------------- 1 file changed, 93 insertions(+), 174 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index ba041d06..b239c019 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -508,9 +508,9 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, - ws: NoBsWs, - token: str, - userref_oid_map: dict, + #ws: NoBsWs, + #token: str, + #userref_oid_map: dict, ) -> None: @@ -540,112 +540,111 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - def slashinsert(str): - midPoint = len(str)//2 - return str[:midPoint] + '/' + str[midPoint:] + #def slashinsert(str): + # midPoint = len(str)//2 + # return str[:midPoint] + '/' + str[midPoint:] - # Send order via websocket - order_msg = { - "event": "addOrder", - "ordertype": "limit", - "pair": slashinsert(order.symbol.upper()), - "price": str(order.price), - "token": token, - "type": order.action, - "volume": str(order.size), - "userref": str(temp_id) - } + ## Send order via websocket + #order_msg = { + # "event": "addOrder", + # "ordertype": "limit", + # "pair": slashinsert(order.symbol.upper()), + # "price": str(order.price), + # "token": token, + # "type": order.action, + # "volume": str(order.size), + # "userref": str(temp_id) + #} - # add oid userref mapping - userref_oid_map[str(temp_id)] = { - 'oid': order.oid, 'account': order.account - } + ## add oid userref mapping + #userref_oid_map[str(temp_id)] = { + # 'oid': order.oid, 'account': order.account + #} - await ws.send_msg(order_msg) + #await ws.send_msg(order_msg) # call our client api to submit the order - #resp = await client.submit_limit( - # oid=order.oid, - # symbol=order.symbol, - # price=order.price, - # action=order.action, - # size=order.size, - # ## XXX: how do I handle new orders - # reqid=temp_id, - #) + resp = await client.submit_limit( + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + reqid=temp_id, + ) - #err = resp['error'] - #if err: - # log.error(f'Failed to submit order') - # await ems_order_stream.send( - # BrokerdError( - # oid=order.oid, - # reqid=temp_id, - # symbol=order.symbol, - # reason="Failed order submission", - # broker_details=resp - # ).dict() - # ) - #else: - # ## TODO: handle multiple cancels - # ## txid is an array of strings - # reqid = resp['result']['txid'][0] - # # deliver ack that order has been submitted to broker routing - # await ems_order_stream.send( - # BrokerdOrderAck( + err = resp['error'] + if err: + log.error(f'Failed to submit order') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order submission", + broker_details=resp + ).dict() + ) + else: + ## TODO: handle multiple cancels + ## txid is an array of strings + reqid = resp['result']['txid'][0] + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( - # # ems order request id - # oid=order.oid, + # ems order request id + oid=order.oid, - # # broker specific request id - # reqid=reqid, + # broker specific request id + reqid=reqid, - # # account the made the order - # account=order.account + # account the made the order + account=order.account - # ).dict() - # ) + ).dict() + ) elif action == 'cancel': msg = BrokerdCancel(**request_msg) - cancel_msg = { - "event": "cancelOrder", - "token": token, - "txid": [msg.reqid] - } + #cancel_msg = { + # "event": "cancelOrder", + # "token": token, + # "txid": [msg.reqid] + #} - await ws.send_msg(cancel_msg) + #await ws.send_msg(cancel_msg) - ## Send order cancellation to kraken - #resp = await client.submit_cancel( - # reqid=msg.reqid - #) + # Send order cancellation to kraken + resp = await client.submit_cancel( + reqid=msg.reqid + ) - #try: - # # Check to make sure there was no error returned by - # # the kraken endpoint. Assert one order was cancelled - # assert resp['error'] == [] - # assert resp['result']['count'] == 1 + try: + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 - # ## TODO: Change this code using .get - # try: - # pending = resp['result']['pending'] - # # Check to make sure the cancellation is NOT pending, - # # then send the confirmation to the ems order stream - # except KeyError: - # await ems_order_stream.send( - # BrokerdStatus( - # reqid=msg.reqid, - # account=msg.account, - # time_ns=time.time_ns(), - # status='cancelled', - # reason='Order cancelled', - # broker_details={'name': 'kraken'} - # ).dict() - # ) - #except AssertionError: - # log.error(f'Order cancel was not successful') + ## TODO: Change this code using .get + try: + pending = resp['result']['pending'] + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + except KeyError: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + except AssertionError: + log.error(f'Order cancel was not successful') else: log.error(f'Unknown order command: {request_msg}') @@ -713,13 +712,13 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) + ## TODO: create a new ems message schema for open orders open_orders = await client.kraken_endpoint('OpenOrders', {}) + print(open_orders) #await tractor.breakpoint() await ctx.started((all_positions, (acc_name,))) - #await trio.sleep_forever() - # Get websocket token for authenticated data stream # Assert that a token was actually received resp = await client.kraken_endpoint('GetWebSocketsToken', {}) @@ -731,96 +730,16 @@ async def trades_dialogue( trio.open_nursery() as n, ): ## TODO: maybe add multiple accounts - #n.start_soon(handle_order_requests, client, ems_stream) - - # Mapping from userref passed to kraken and oid from piker - userref_oid_map = {} + n.start_soon(handle_order_requests, client, ems_stream) async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: - n.start_soon( - handle_order_requests, - client, - ems_stream, - ws, - token, - userref_oid_map - ) from pprint import pprint - pending_orders = [] async for msg in process_order_msgs(ws): pprint(msg) - for order in msg: - ## TODO: Maybe do a better check and handle accounts - if type(order) == dict: - if order['status'] == 'canceled': - await ems_stream.send( - BrokerdStatus( - account='kraken.spot', - reqid=order['txid'], - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - for pending_order in pending_orders: - if pending_order.txid == order['txid'] and order['status'] == 'open': - await ems_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=userref_oid_map[pending_order.userref]['oid'], - - # broker specific request id - reqid=order['txid'], - - # account the made the order - account=userref_oid_map[ - pending_order.userref - ]['account'] - - ).dict() - ) - - elif order.status == 'pending': - pending_orders.append(order) - - - #if not pending_oder and order.status == 'open': - # await ems_stream.send( - # BrokerdOrder( - # action=order.action, - # oid='', - # ## TODO: support multi accounts? - # account='kraken.spot', - # time_ns=int(float(order.opentm) * 10**9), - # reqid=order.txid, - # symbol=order.pair.replace('/', '').lower(),# + \ - # #'.kraken', - # price=float(order.price), - # size=float(order.vol) - # ).dict() - # ) - - - #await ems_order_stream.send( - # BrokerdOrderAck( - - # # ems order request id - # oid=order.oid, - - # # broker specific request id - # reqid=reqid, - - # # account the made the order - # account=order.account - - # ).dict() - #) async def stream_messages( From ee0be13af100c5c1b9ec658f5ae530cf6ad3f9d1 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Sat, 19 Feb 2022 00:14:58 -0500 Subject: [PATCH 21/29] repurpose ws code for ownTrades stream, get trade authentication going --- piker/brokers/kraken.py | 148 +++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 87 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index b239c019..7c75d735 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -119,18 +119,13 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair -class Order(BaseModel): +class Trade(BaseModel): """Order class that helps parse and validate order stream""" - txid: str # kraken order transaction id + reqid: str # kraken order transaction id action: str # buy or sell - ordertype: str # limit order ##TODO: Do I need this? - pair: str # order pair price: str # price of asset - vol: str # vol of asset - status: str # order status - opentm: str # time of order - timeinforce: str # e.g GTC, GTD - userref: str # for a mapping to oids + size: str # vol of asset + broker_time: str # e.g GTC, GTD @dataclass @@ -540,29 +535,7 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - #def slashinsert(str): - # midPoint = len(str)//2 - # return str[:midPoint] + '/' + str[midPoint:] - - ## Send order via websocket - #order_msg = { - # "event": "addOrder", - # "ordertype": "limit", - # "pair": slashinsert(order.symbol.upper()), - # "price": str(order.price), - # "token": token, - # "type": order.action, - # "volume": str(order.size), - # "userref": str(temp_id) - #} - - ## add oid userref mapping - #userref_oid_map[str(temp_id)] = { - # 'oid': order.oid, 'account': order.account - #} - - #await ws.send_msg(order_msg) - + # call our client api to submit the order resp = await client.submit_limit( oid=order.oid, @@ -608,14 +581,6 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) - #cancel_msg = { - # "event": "cancelOrder", - # "token": token, - # "txid": [msg.reqid] - #} - - #await ws.send_msg(cancel_msg) - # Send order cancellation to kraken resp = await client.submit_cancel( reqid=msg.reqid @@ -669,7 +634,7 @@ async def trades_dialogue( # specific logic for this in kraken's shitty sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 trades_sub = make_auth_sub( - {'name': 'openOrders', 'token': token} + {'name': 'ownTrades', 'token': token} ) # TODO: we want to eventually allow unsubs which should @@ -692,7 +657,7 @@ async def trades_dialogue( # unsub from all pairs on teardown await ws.send_msg({ 'event': 'unsubscribe', - 'subscription': ['openOrders'], + 'subscription': ['ownTrades'], }) # XXX: do we need to ack the unsub? @@ -732,14 +697,50 @@ async def trades_dialogue( ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + # Process trades msg stream of ws async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: from pprint import pprint - async for msg in process_order_msgs(ws): + async for msg in process_trade_msgs(ws): pprint(msg) + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' ## TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + ## TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) async def stream_messages( @@ -833,7 +834,7 @@ async def process_data_feed_msgs( yield msg -async def process_order_msgs( +async def process_trade_msgs( ws: NoBsWs, ): ''' @@ -848,62 +849,35 @@ async def process_order_msgs( # and then in the caller recast to our native ``BrokerdX`` msg types. try: - # check that we are on openOrders stream and that msgs are arriving - # in sequence with kraken - assert msg[1] == 'openOrders' + # check that we are on the ownTrades stream and that msgs are + # arriving in sequence with kraken + assert msg[1] == 'ownTrades' assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 raw_msgs = msg[0] # TODO: get length and start list - order_msgs = [] + trade_msgs = [] - try: + # Check that we are only processing new trades + if msg[2]['sequence'] != 1: # check if its a new order or an update msg - for order in raw_msgs: - txid = list(order.keys())[0] - order_msg = Order( - txid=txid, - action=order[txid]['descr']['type'], - ordertype=order[txid]['descr']['ordertype'], - pair=order[txid]['descr']['pair'], - price=order[txid]['descr']['price'], - vol=order[txid]['vol'], - status=order[txid]['status'], - opentm=order[txid]['opentm'], - timeinforce=order[txid]['timeinforce'], - userref=order[txid]['userref'] + for trade_msg in raw_msgs: + trade = list(trade_msg.values())[0] + order_msg = Trade( + reqid=trade['ordertxid'], + action=trade['type'], + price=trade['price'], + size=trade['vol'], + broker_time=trade['time'] ) - order_msgs.append(order_msg) - - yield order_msgs - - except KeyError: - for order in raw_msgs: - txid = list(order.keys())[0] - ## TODO: maybe use a pydantic.BaseModel - order_msg = { - 'txid': txid, - 'status': order[txid]['status'], - 'userref': order[txid]['userref'] - } - order_msgs.append(order_msg) - - yield order_msgs + trade_msgs.append(order_msg) + yield trade_msgs except AssertionError: print(f'UNHANDLED MSG: {msg}') yield msg - # form of order msgs: - # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', - # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': - # '0.00000000', 'fee': '0.00000000', 'avg_price': - # '0.00000000', 'userref': 1, 'cancel_reason': 'User - # requested'}}], 'openOrders', {'sequence': 4}] - - # yield msg - def normalize( ohlc: OHLC, From a3345dbba22c3972d89f427dcf65c5a5b2067147 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Tue, 1 Mar 2022 15:30:21 -0500 Subject: [PATCH 22/29] cleaned up code and added loop to grab all trades for position calcs --- piker/brokers/kraken.py | 61 +++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 7c75d735..74119e88 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -120,7 +120,7 @@ class Pair(BaseModel): class Trade(BaseModel): - """Order class that helps parse and validate order stream""" + """Trade class that helps parse and validate ownTrades stream""" reqid: str # kraken order transaction id action: str # buy or sell price: str # price of asset @@ -258,17 +258,37 @@ class Client: self, data: Dict[str, Any] = {} ) -> Dict[str, Any]: - resp = await self.kraken_endpoint('Balance', data) - balances = resp['result'] - ## TODO: grab all entries, not just first 50 - resp = await self.kraken_endpoint('TradesHistory', data) - traders = resp['result'] + data['ofs'] = 0 positions = {} vols = {} - + # Grab all trade history + while True: + resp = await self.kraken_endpoint('TradesHistory', data) + # grab the first 50 trades + if data['ofs'] == 0: + trades = resp['result']['trades'] + # load the next 50 trades using dict constructor + # for speed + elif data['ofs'] == 50: + trades = dict(trades, **resp['result']['trades']) + # catch the of the trades + elif resp['result']['trades'] == {}: + count = resp['result']['count'] + break + # update existing dict if num trades exceeds 100 + else: + trades.update(resp['result']['trades']) + # increment the offset counter + data['ofs'] += 50 + # To avoid exceeding API rate limit in case of a lot of trades + time.sleep(1) + + # make sure you grabbed all the trades + assert count == len(trades.values()) + # positions ## TODO: Make sure to add option to include fees in positions calc - for trade in traders['trades'].values(): + for trade in trades.values(): sign = -1 if trade['type'] == 'sell' else 1 try: positions[trade['pair']] += sign * float(trade['cost']) @@ -438,15 +458,10 @@ class Client: async def get_client() -> Client: client = Client() - ## TODO: maybe add conditional based on section section = get_config() client._name = section['key_descr'] client._api_key = section['api_key'] client._secret = section['secret'] - ## TODO: Add a client attribute to hold this info - #data = { - # # add non-nonce and non-ofs vars - #} # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -628,7 +643,6 @@ async def trades_dialogue( @asynccontextmanager async def subscribe(ws: wsproto.WSConnection, token: str): - ## TODO: Fix docs and points to right urls # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: @@ -643,15 +657,6 @@ async def trades_dialogue( # locked. await ws.send_msg(trades_sub) - ## trade data (aka L1) - #l1_sub = make_sub( - # list(ws_pairs.values()), - # {'name': 'spread'} # 'depth': 10} - #) - - ## pull a first quote and deliver - #await ws.send_msg(l1_sub) - yield # unsub from all pairs on teardown @@ -677,11 +682,6 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - ## TODO: create a new ems message schema for open orders - open_orders = await client.kraken_endpoint('OpenOrders', {}) - print(open_orders) - #await tractor.breakpoint() - await ctx.started((all_positions, (acc_name,))) # Get websocket token for authenticated data stream @@ -844,10 +844,6 @@ async def process_trade_msgs( sequence_counter = 0 async for msg in stream_messages(ws): - # TODO: write your order event parser here! - # HINT: create a ``pydantic.BaseModel`` to parse and validate - # and then in the caller recast to our native ``BrokerdX`` msg types. - try: # check that we are on the ownTrades stream and that msgs are # arriving in sequence with kraken @@ -855,7 +851,6 @@ async def process_trade_msgs( assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 raw_msgs = msg[0] - # TODO: get length and start list trade_msgs = [] # Check that we are only processing new trades From 617bf3e0dae206a6788ee93d168baa492f123fd6 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Tue, 1 Mar 2022 15:38:20 -0500 Subject: [PATCH 23/29] fix typo and get rid of pprint of ws stream --- piker/brokers/kraken.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 74119e88..6e9e55b3 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -271,7 +271,7 @@ class Client: # for speed elif data['ofs'] == 50: trades = dict(trades, **resp['result']['trades']) - # catch the of the trades + # catch the end of the trades elif resp['result']['trades'] == {}: count = resp['result']['count'] break @@ -454,6 +454,7 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') + @asynccontextmanager async def get_client() -> Client: client = Client() @@ -703,9 +704,7 @@ async def trades_dialogue( fixture=subscribe, token=token, ) as ws: - from pprint import pprint async for msg in process_trade_msgs(ws): - pprint(msg) for trade in msg: # check the type of packaged message assert type(trade) == Trade From fd0acd21fb27998438e6794abee934030af26094 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Sun, 6 Mar 2022 15:16:42 -0500 Subject: [PATCH 24/29] refactory based on github comments, change doc string style --- piker/brokers/kraken.py | 150 ++++++++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 59 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 6e9e55b3..90b40b2a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -14,13 +14,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Kraken backend. -""" +''' from contextlib import asynccontextmanager from dataclasses import asdict, field -from typing import List, Dict, Any, Tuple, Optional, AsyncIterator +from typing import Dict, List, Tuple, Any, Optional, AsyncIterator import time from trio_typing import TaskStatus @@ -34,6 +34,10 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto from itertools import count +import urllib.parse +import hashlib +import hmac +import base64 from .. import config from .._cacheables import open_cached_client @@ -47,11 +51,6 @@ from ..clearing._messages import ( BrokerdFill, ) -import urllib.parse -import hashlib -import hmac -import base64 - log = get_logger(__name__) @@ -120,7 +119,10 @@ class Pair(BaseModel): class Trade(BaseModel): - """Trade class that helps parse and validate ownTrades stream""" + ''' + Trade class that helps parse and validate ownTrades stream + + ''' reqid: str # kraken order transaction id action: str # buy or sell price: str # price of asset @@ -130,11 +132,13 @@ class Trade(BaseModel): @dataclass class OHLC: - """Description of the flattened OHLC quote format. + ''' + Description of the flattened OHLC quote format. For schema details see: https://docs.kraken.com/websockets/#message-ohlc - """ + + ''' chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair @@ -179,16 +183,24 @@ def get_kraken_signature( class InvalidKey(ValueError): - """EAPI:Invalid key - This error is returned when the API key used for the call is - either expired or disabled, please review the API key in your - Settings -> API tab of account management or generate a new one - and update your application.""" + ''' + EAPI:Invalid key + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one + and update your application. + + ''' class Client: - def __init__(self) -> None: + def __init__( + self, + name: str = '', + api_key: str = '', + secret: str = '' + ) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location = _url self._sesh.headers.update({ @@ -196,9 +208,9 @@ class Client: 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) self._pairs: list[str] = [] - self._name = '' - self._api_key = '' - self._secret = '' + self._name = name + self._api_key = api_key + self._secret = secret @property def pairs(self) -> Dict[str, Any]: @@ -244,26 +256,26 @@ class Client: ) return resproc(resp, log) - async def kraken_endpoint( + async def endpoint( self, method: str, data: Dict[str, Any] ) -> Dict[str, Any]: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) - resp = await self._private(method, data, uri_path) - return resp + return await self._private(method, data, uri_path) async def get_positions( self, data: Dict[str, Any] = {} - ) -> Dict[str, Any]: + ) -> (Dict[str, Any], Dict[str, Any]): data['ofs'] = 0 positions = {} vols = {} # Grab all trade history + # https://docs.kraken.com/rest/#operation/getTradeHistory while True: - resp = await self.kraken_endpoint('TradesHistory', data) + resp = await self.endpoint('TradesHistory', data) # grab the first 50 trades if data['ofs'] == 0: trades = resp['result']['trades'] @@ -281,22 +293,28 @@ class Client: # increment the offset counter data['ofs'] += 50 # To avoid exceeding API rate limit in case of a lot of trades - time.sleep(1) + await trio.sleep(1) # make sure you grabbed all the trades assert count == len(trades.values()) # positions - ## TODO: Make sure to add option to include fees in positions calc + # TODO: Make sure to add option to include fees in positions calc for trade in trades.values(): sign = -1 if trade['type'] == 'sell' else 1 + # This catch is for populating the dict with new values + # as the plus assigment will fail if there no value + # tied to the key try: positions[trade['pair']] += sign * float(trade['cost']) vols[trade['pair']] += sign * float(trade['vol']) except KeyError: positions[trade['pair']] = sign * float(trade['cost']) vols[trade['pair']] = sign * float(trade['vol']) - + # This cycles through the summed trades of an asset and then + # normalizes the price with the current volume of the asset + # you are holding. If you have no more of the asset, the balance + # is 0, then it sets the position to 0. for pair in positions.keys(): asset_balance = vols[pair] if asset_balance == 0: @@ -316,9 +334,10 @@ class Client: # account: str, reqid: int = None, ) -> int: - """Place an order and return integer request id provided by client. + ''' + Place an order and return integer request id provided by client. - """ + ''' # Build order data for kraken api data = { "userref": reqid, @@ -330,20 +349,18 @@ class Client: # set to True test AddOrder call without a real submission "validate": False } - resp = await self.kraken_endpoint('AddOrder', data) - return resp + return await self.endpoint('AddOrder', data) async def submit_cancel( self, reqid: str, ) -> None: - """Send cancel request for order id ``reqid``. + ''' + Send cancel request for order id ``reqid``. - """ + ''' # txid is a transaction id given by kraken - data = {"txid": reqid} - resp = await self.kraken_endpoint('CancelOrder', data) - return resp + return await self.endpoint('CancelOrder', {"txid": reqid}) async def symbol_info( self, @@ -457,12 +474,13 @@ class Client: @asynccontextmanager async def get_client() -> Client: - client = Client() section = get_config() - client._name = section['key_descr'] - client._api_key = section['api_key'] - client._secret = section['secret'] + client = Client( + name=section['key_descr'], + api_key=section['api_key'], + secret=section['secret'] + ) # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -490,6 +508,8 @@ def pack_position( def normalize_symbol( ticker: str ) -> str: + # This is to convert symbol names from what kraken + # uses to the traditional 3x3 pair symbol syntax symlen = len(ticker) if symlen == 6: return ticker.lower() @@ -501,12 +521,13 @@ def normalize_symbol( def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: - """Create a request subscription packet dict. + ''' + Create a request subscription packet dict. ## TODO: point to the auth urls https://docs.kraken.com/websockets/#message-subscribe - """ + ''' # eg. specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 return { @@ -519,9 +540,6 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, - #ws: NoBsWs, - #token: str, - #userref_oid_map: dict, ) -> None: @@ -575,8 +593,8 @@ async def handle_order_requests( ).dict() ) else: - ## TODO: handle multiple cancels - ## txid is an array of strings + # TODO: handle multiple cancels + # txid is an array of strings reqid = resp['result']['txid'][0] # deliver ack that order has been submitted to broker routing await ems_order_stream.send( @@ -608,7 +626,7 @@ async def handle_order_requests( assert resp['error'] == [] assert resp['result']['count'] == 1 - ## TODO: Change this code using .get + # TODO: Change this code using .get try: pending = resp['result']['pending'] # Check to make sure the cancellation is NOT pending, @@ -626,6 +644,15 @@ async def handle_order_requests( ) except AssertionError: log.error(f'Order cancel was not successful') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order cancel", + broker_details=resp + ).dict() + ) else: log.error(f'Unknown order command: {request_msg}') @@ -669,7 +696,6 @@ async def trades_dialogue( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # Authenticated block async with get_client() as client: acc_name = 'kraken.' + client._name @@ -687,7 +713,7 @@ async def trades_dialogue( # Get websocket token for authenticated data stream # Assert that a token was actually received - resp = await client.kraken_endpoint('GetWebSocketsToken', {}) + resp = await client.endpoint('GetWebSocketsToken', {}) assert resp['error'] == [] token = resp['result']['token'] @@ -717,7 +743,7 @@ async def trades_dialogue( status='executed', filled=float(trade.size), reason='Order filled by kraken', - # remaining='' ## TODO: not sure what to do here. + # remaining='' # TODO: not sure what to do here. broker_details={ 'name': 'kraken', 'broker_time': trade.broker_time @@ -734,7 +760,7 @@ async def trades_dialogue( action=trade.action, size=float(trade.size), price=float(trade.price), - ## TODO: maybe capture more msg data i.e fees? + # TODO: maybe capture more msg data i.e fees? broker_details={'name': 'kraken'}, broker_time=float(trade.broker_time) ) @@ -846,6 +872,8 @@ async def process_trade_msgs( try: # check that we are on the ownTrades stream and that msgs are # arriving in sequence with kraken + # For clarification the kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades assert msg[1] == 'ownTrades' assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 @@ -894,11 +922,12 @@ def normalize( def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: - """Create a request subscription packet dict. + ''' + Create a request subscription packet dict. https://docs.kraken.com/websockets/#message-subscribe - """ + ''' # eg. specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 return { @@ -916,8 +945,9 @@ async def backfill_bars( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: - """Fill historical bars into shared mem / storage afap. - """ + ''' + Fill historical bars into shared mem / storage afap. + ''' with trio.CancelScope() as cs: async with open_cached_client('kraken') as client: bars = await client.bars(symbol=sym) @@ -939,10 +969,12 @@ async def stream_quotes( task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """Subscribe for ohlc stream of quotes for ``pairs``. + ''' + Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. - """ + + ''' # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) From 1525c645cef0ce1c98deee8835adf2973a5d2928 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Sun, 20 Mar 2022 13:52:45 -0400 Subject: [PATCH 25/29] refactor get_positions into get_trades, and refactor pack_position with postion calc logic --- piker/brokers/kraken.py | 101 +++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 53 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 90b40b2a..96f8101c 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -265,15 +265,14 @@ class Client: data['nonce'] = str(int(1000*time.time())) return await self._private(method, data, uri_path) - async def get_positions( + async def get_trades( self, data: Dict[str, Any] = {} - ) -> (Dict[str, Any], Dict[str, Any]): + ) -> Dict[str, Any]: data['ofs'] = 0 - positions = {} - vols = {} # Grab all trade history # https://docs.kraken.com/rest/#operation/getTradeHistory + # Kraken uses 'ofs' to refer to the offset while True: resp = await self.endpoint('TradesHistory', data) # grab the first 50 trades @@ -298,31 +297,7 @@ class Client: # make sure you grabbed all the trades assert count == len(trades.values()) - # positions - # TODO: Make sure to add option to include fees in positions calc - for trade in trades.values(): - sign = -1 if trade['type'] == 'sell' else 1 - # This catch is for populating the dict with new values - # as the plus assigment will fail if there no value - # tied to the key - try: - positions[trade['pair']] += sign * float(trade['cost']) - vols[trade['pair']] += sign * float(trade['vol']) - except KeyError: - positions[trade['pair']] = sign * float(trade['cost']) - vols[trade['pair']] = sign * float(trade['vol']) - # This cycles through the summed trades of an asset and then - # normalizes the price with the current volume of the asset - # you are holding. If you have no more of the asset, the balance - # is 0, then it sets the position to 0. - for pair in positions.keys(): - asset_balance = vols[pair] - if asset_balance == 0: - positions[pair] = 0 - else: - positions[pair] /= asset_balance - - return positions, vols + return trades async def submit_limit( self, @@ -488,21 +463,49 @@ async def get_client() -> Client: yield client -def pack_position( +def pack_positions( acc: str, - symkey: str, - pos: float, - vol: float -) -> dict[str, Any]: + trades: dict +) -> list[Any]: + positions: dict[str, float] = {} + vols: dict[str, float] = {} + costs: dict[str, float] = {} + position_msgs: list[Any] = [] - return BrokerdPosition( - broker='kraken', - account=acc, - symbol=symkey, - currency=symkey[-3:], - size=float(vol), - avg_price=float(pos), - ) + for trade in trades.values(): + sign = -1 if trade['type'] == 'sell' else 1 + # This catch is for populating the dict with new values + # as the plus assigment will fail if there no value + # tied to the key + pair = trade['pair'] + vol = float(trade['vol']) + # This is for the initial addition of a pair so the + # += operation does not fail. + vols[pair] = vols.setdefault(pair, 0) + costs[pair] = costs.setdefault(pair, 0) + positions[pair] = positions.setdefault(pair, 0) + vols[pair] += sign * vol + costs[pair] += sign * float(trade['cost']) + if vols[pair] != 0: + positions[pair] = costs[pair] / vols[pair] + else: + positions[pair] = 0 + + for ticker, pos in positions.items(): + norm_sym = normalize_symbol(ticker) + vol = float(vols[ticker]) + if vol != 0: + msg = BrokerdPosition( + broker='kraken', + account=acc, + symbol=norm_sym, + currency=norm_sym[-3:], + size=vol, + avg_price=float(pos), + ) + position_msgs.append(msg.dict()) + + return position_msgs def normalize_symbol( @@ -667,8 +670,6 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - # Generate - @asynccontextmanager async def subscribe(ws: wsproto.WSConnection, token: str): # XXX: setup subs @@ -699,17 +700,11 @@ async def trades_dialogue( # Authenticated block async with get_client() as client: acc_name = 'kraken.' + client._name - positions, vols = await client.get_positions() + trades = await client.get_trades() - all_positions = [] + position_msgs = pack_positions(acc_name, trades) - for ticker, pos in positions.items(): - norm_sym = normalize_symbol(ticker) - if float(vols[ticker]) != 0: - msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) - all_positions.append(msg.dict()) - - await ctx.started((all_positions, (acc_name,))) + await ctx.started((position_msgs, (acc_name,))) # Get websocket token for authenticated data stream # Assert that a token was actually received From cb8e97a142815378ec247a1cdca4ff18ac3c78ce Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 23 Mar 2022 10:34:53 -0400 Subject: [PATCH 26/29] address latest comments, refactor the pack position function --- piker/brokers/kraken.py | 41 +++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 96f8101c..4a8d65d5 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -306,7 +306,6 @@ class Client: price: float, action: str, size: float, -# account: str, reqid: int = None, ) -> int: ''' @@ -474,36 +473,26 @@ def pack_positions( for trade in trades.values(): sign = -1 if trade['type'] == 'sell' else 1 - # This catch is for populating the dict with new values - # as the plus assigment will fail if there no value - # tied to the key pair = trade['pair'] vol = float(trade['vol']) - # This is for the initial addition of a pair so the - # += operation does not fail. - vols[pair] = vols.setdefault(pair, 0) - costs[pair] = costs.setdefault(pair, 0) - positions[pair] = positions.setdefault(pair, 0) - vols[pair] += sign * vol - costs[pair] += sign * float(trade['cost']) - if vols[pair] != 0: - positions[pair] = costs[pair] / vols[pair] - else: - positions[pair] = 0 + vols[pair] = vols.get(pair, 0) + sign * vol + costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) + positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 for ticker, pos in positions.items(): - norm_sym = normalize_symbol(ticker) vol = float(vols[ticker]) - if vol != 0: - msg = BrokerdPosition( - broker='kraken', - account=acc, - symbol=norm_sym, - currency=norm_sym[-3:], - size=vol, - avg_price=float(pos), - ) - position_msgs.append(msg.dict()) + if not vol: + continue + norm_sym = normalize_symbol(ticker) + msg = BrokerdPosition( + broker='kraken', + account=acc, + symbol=norm_sym, + currency=norm_sym[-3:], + size=vol, + avg_price=float(pos), + ) + position_msgs.append(msg.dict()) return position_msgs From 2baa1b460571191f504eee845004f4c89b9c0548 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 28 Mar 2022 18:28:19 -0400 Subject: [PATCH 27/29] fix hang when kraken is not in config --- piker/brokers/kraken.py | 133 ++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 4a8d65d5..1a35c760 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -450,11 +450,14 @@ class Client: async def get_client() -> Client: section = get_config() - client = Client( - name=section['key_descr'], - api_key=section['api_key'], - secret=section['secret'] - ) + if section: + client = Client( + name=section['key_descr'], + api_key=section['api_key'], + secret=section['secret'] + ) + else: + client = Client() # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -688,68 +691,80 @@ async def trades_dialogue( # Authenticated block async with get_client() as client: - acc_name = 'kraken.' + client._name - trades = await client.get_trades() + if client._api_key: + acc_name = 'kraken.' + client._name + trades = await client.get_trades() - position_msgs = pack_positions(acc_name, trades) + position_msgs = pack_positions(acc_name, trades) - await ctx.started((position_msgs, (acc_name,))) + await ctx.started((position_msgs, (acc_name,))) - # Get websocket token for authenticated data stream - # Assert that a token was actually received - resp = await client.endpoint('GetWebSocketsToken', {}) - assert resp['error'] == [] - token = resp['result']['token'] + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - ## TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) - # Process trades msg stream of ws - async with open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade - # prepare and send a status update for line update - trade_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), - account='kraken.spot', - status='executed', - filled=float(trade.size), - reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - } - ) - - await ems_stream.send(trade_msg.dict()) + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) - - await ems_stream.send(fill_msg.dict()) + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + + else: + log.error('Missing Kraken API key: Trades WS connection failed') + await ctx.started(({}, {'paper',})) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) async def stream_messages( From c2e654aae28d06a3776a3ebdf19b7319833a1bcc Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 7 Apr 2022 13:03:53 -0400 Subject: [PATCH 28/29] change logic order for handling no config case --- piker/brokers/kraken.py | 143 ++++++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 65 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 1a35c760..27e0cfe7 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -45,6 +45,7 @@ from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws, NoBsWs +from ..clearing._paper_engine import PaperBoi from ..clearing._messages import ( BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdOrderAck, BrokerdError, BrokerdCancel, @@ -691,71 +692,7 @@ async def trades_dialogue( # Authenticated block async with get_client() as client: - if client._api_key: - acc_name = 'kraken.' + client._name - trades = await client.get_trades() - - position_msgs = pack_positions(acc_name, trades) - - await ctx.started((position_msgs, (acc_name,))) - - # Get websocket token for authenticated data stream - # Assert that a token was actually received - resp = await client.endpoint('GetWebSocketsToken', {}) - assert resp['error'] == [] - token = resp['result']['token'] - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - ## TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - # Process trades msg stream of ws - async with open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade - # prepare and send a status update for line update - trade_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), - - account='kraken.spot', - status='executed', - filled=float(trade.size), - reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - } - ) - - await ems_stream.send(trade_msg.dict()) - - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), - - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) - - await ems_stream.send(fill_msg.dict()) - - else: + if not client._api_key: log.error('Missing Kraken API key: Trades WS connection failed') await ctx.started(({}, {'paper',})) @@ -763,9 +700,85 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): + + client = PaperBoi( + 'kraken', + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + + # TODO: load paper positions from ``positions.toml`` + _positions={}, + ) + ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + acc_name = 'kraken.' + client._name + trades = await client.get_trades() + + position_msgs = pack_positions(acc_name, trades) + + await ctx.started((position_msgs, (acc_name,))) + + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + async def stream_messages( ws: NoBsWs, From cb970cef469fcd9e02eb4faf54892a83d25ee8ab Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Fri, 8 Apr 2022 19:25:24 -0400 Subject: [PATCH 29/29] dark order gui patch, add filled status message --- piker/brokers/kraken.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 27e0cfe7..18475886 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -764,6 +764,23 @@ async def trades_dialogue( await ems_stream.send(trade_msg.dict()) + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(filled_msg.dict()) + # send a fill msg for gui update fill_msg = BrokerdFill( reqid=trade.reqid,