From 4a3515541da4e1e84c1e9295bb319bad6212a8e8 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 28 Oct 2021 15:52:02 -0400 Subject: [PATCH] get positions from trades --- piker/brokers/kraken.py | 185 +++++++++++++--------------------------- 1 file changed, 57 insertions(+), 128 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 68cd8a6c..825026d0 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -20,7 +20,7 @@ Kraken backend. """ from contextlib import asynccontextmanager from dataclasses import asdict, field -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple, Optional, AsyncIterator import time from trio_typing import TaskStatus @@ -34,12 +34,13 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from . import config +from .. import config from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws +from ..clearing._messages import BrokerdPosition import urllib.parse import hashlib @@ -229,41 +230,35 @@ class Client: print(err) return resp['result'] - async def get_ledger( + async def get_positions( self, - data: Dict[str, Any] - ) -> pd.DataFrame: - ledgers = await self.get_user_data('Ledgers', data) - num_entries = int(ledgers['count']) - crypto_transactions = np.empty((num_entries, 5), dtype=object) - if num_entries // 50 < 0 or num_entries == 50: - # NOTE: Omitting the following values from the kraken ledger: - # -> 'refid', 'type', 'subtype', 'aclass' - for i, entry in enumerate(ledgers['ledger'].items()): - crypto_transactions[i] = [ - entry[1]['time'], - entry[1]['amount'], - entry[1]['fee'], - entry[1]['asset'], - entry[1]['balance'] - ] - else: - for n in range(num_entries // 50 + 1): - data['ofs'] = n * 50 - ledgers = await self.get_user_data('Ledgers', data) - for i, entry in enumerate(ledgers['ledger'].items()): - crypto_transactions[i + n * 50] = [ - entry[1]['time'], - entry[1]['amount'], - entry[1]['fee'], - entry[1]['asset'], - entry[1]['balance'] - ] - ledger = pd.DataFrame( - columns = ['time', 'amount', 'fee', 'asset', 'balance'], - data = crypto_transactions - ) - return ledger + data: Dict[str, Any] = {} + ) -> Dict[str, Any]: + balances = await self.get_user_data('Balance', data) + ## TODO: grab all entries, not just first 50 + traders = await self.get_user_data('TradesHistory', data) + positions = {} + vols = {} + + # positions + ## TODO: Make sure to add option to include fees in positions calc + for trade in traders['trades'].values(): + sign = -1 if trade['type'] == 'sell' else 1 + try: + positions[trade['pair']] += sign * float(trade['cost']) + vols[trade['pair']] += sign * float(trade['vol']) + except KeyError: + positions[trade['pair']] = sign * float(trade['cost']) + vols[trade['pair']] = sign * float(trade['vol']) + + for pair in positions.keys(): + asset_balance = vols[pair] + if asset_balance == 0: + positions[pair] = 0 + else: + positions[pair] /= asset_balance + + return positions async def symbol_info( self, @@ -385,80 +380,9 @@ async def get_client() -> Client: data = { # add non-nonce and non-ofs vars } + # positions = await client.get_positions(data) - balances = await client.get_user_data('Balance', data) - traders = await client.get_user_data('TradesHistory', data) - ledger = await client.get_ledger(data) - - # positions - ## TODO: Make sure to add option with fees - n, m = ledger.shape - ledger['time'] = ledger['time'].apply(lambda x: int(x)) - assets = set(ledger['asset']) - trade_times = set(ledger['time']) - trades = {} - positions = {} - # for index, row in ledger.iterrows(): - # if index == n: - # break - # asset = row['asset'] - # ## TODO: Look into a way to generalize this - # if asset != 'ZEUR' and ledger.loc[index+1, 'asset'] == 'ZEUR': - # try: - # trades[asset]['amounts'].append(float(ledger.loc[index, 'amount'])) - # trades[asset]['prices'].append(float(ledger.loc[index+1, 'amount'])) - # except KeyError: - # trades[asset] = { - # 'amounts': [float(ledger.loc[index+1, 'amount'])], - # 'prices': [float(ledger.loc[index+1, 'amount'])] - # } - - ## TODO: Look into a way to generalize this for any fiat - ## TODO: Figure out how to handle coin for coin trades - for trade_time in trade_times: - trade = ledger[ledger['time'] == trade_time] - coin = trade[trade['asset'] != 'ZEUR'] - fiat = trade[trade['asset'] == 'ZEUR'] - if len(coin) == 0 or len(coin) > 1: - continue - asset = list(coin.loc[:, 'asset'])[0] - amount = np.sum(coin['amount'].apply(lambda x: float(x))) - if amount > 0: - sign = -1 - price = sign * np.sum(fiat['amount'].apply(lambda x: float(x))) - try: - trades[asset]['trade_amounts'].append(amount) - trades[asset]['trade_prices'].append(price) - trades[asset]['enter_amounts'].append(amount) - except KeyError: - trades[asset] = { - 'trade_amounts': [amount], - 'trade_prices': [price], - 'enter_amounts': [amount] - } - else: - price = 0 - # continue - try: - trades[asset]['trade_amounts'].append(amount) - trades[asset]['trade_prices'].append(price) - except KeyError: - trades[asset] = { - 'trade_amounts': [amount], - 'trade_prices': [price], - 'enter_amounts': [] - } - - - for asset in assets: - if asset == 'ZEUR': - continue - t_amounts = np.array(trades[asset]['trade_amounts']) - t_prices = np.array(trades[asset]['trade_prices']) - e_amounts = np.array(trades[asset]['enter_amounts']) - positions[asset] = np.dot(np.divide(t_prices, t_amounts), t_amounts) / np.sum(e_amounts) - - await tractor.breakpoint() + # await tractor.breakpoint() # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -466,27 +390,32 @@ async def get_client() -> Client: yield client -# @tractor.context -# async def trades_dialogue( -# ctx: tractor.Context, -# loglevel: str = None, -# ) -> AsyncIterator[Dict[str, Any]]: -# -# # XXX: required to propagate ``tractor`` loglevel to piker logging -# get_console_log(loglevel or tractor.current_actor().loglevel) -# -# # deliver positions to subscriber before anything else -# positions = await _trio_run_client_method(method='positions') -# -# all_positions = {} -# -# for pos in positions: -# msg = pack_position(pos) -# all_positions[msg.symbol] = msg.dict() -# -# await ctx.started(all_positions) +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None, +) -> AsyncIterator[Dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # deliver positions to subscriber before anything else + # positions = await _trio_run_client_method(method='positions') + + global _accounts2clients + + positions = await client.get_positions() + + await tractor.breakpoint() + all_positions = {} + + for pos in positions: + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() + + await ctx.started(all_positions) async def stream_messages(ws):