get positions from trades

kraken_gb
Konstantine Tsafatinos 2021-10-28 15:52:02 -04:00 committed by Tyler Goodlet
parent 0bab95eaa6
commit 20a5ffdc2b
1 changed files with 57 additions and 128 deletions

View File

@ -20,7 +20,7 @@ Kraken backend.
"""
from contextlib import asynccontextmanager
from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import time
from trio_typing import TaskStatus
@ -34,12 +34,13 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from . import config
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..clearing._messages import BrokerdPosition
import urllib.parse
import hashlib
@ -229,41 +230,35 @@ class Client:
print(err)
return resp['result']
async def get_ledger(
async def get_positions(
self,
data: Dict[str, Any]
) -> pd.DataFrame:
ledgers = await self.get_user_data('Ledgers', data)
num_entries = int(ledgers['count'])
crypto_transactions = np.empty((num_entries, 5), dtype=object)
if num_entries // 50 < 0 or num_entries == 50:
# NOTE: Omitting the following values from the kraken ledger:
# -> 'refid', 'type', 'subtype', 'aclass'
for i, entry in enumerate(ledgers['ledger'].items()):
crypto_transactions[i] = [
entry[1]['time'],
entry[1]['amount'],
entry[1]['fee'],
entry[1]['asset'],
entry[1]['balance']
]
else:
for n in range(num_entries // 50 + 1):
data['ofs'] = n * 50
ledgers = await self.get_user_data('Ledgers', data)
for i, entry in enumerate(ledgers['ledger'].items()):
crypto_transactions[i + n * 50] = [
entry[1]['time'],
entry[1]['amount'],
entry[1]['fee'],
entry[1]['asset'],
entry[1]['balance']
]
ledger = pd.DataFrame(
columns = ['time', 'amount', 'fee', 'asset', 'balance'],
data = crypto_transactions
)
return ledger
data: Dict[str, Any] = {}
) -> Dict[str, Any]:
balances = await self.get_user_data('Balance', data)
## TODO: grab all entries, not just first 50
traders = await self.get_user_data('TradesHistory', data)
positions = {}
vols = {}
# positions
## TODO: Make sure to add option to include fees in positions calc
for trade in traders['trades'].values():
sign = -1 if trade['type'] == 'sell' else 1
try:
positions[trade['pair']] += sign * float(trade['cost'])
vols[trade['pair']] += sign * float(trade['vol'])
except KeyError:
positions[trade['pair']] = sign * float(trade['cost'])
vols[trade['pair']] = sign * float(trade['vol'])
for pair in positions.keys():
asset_balance = vols[pair]
if asset_balance == 0:
positions[pair] = 0
else:
positions[pair] /= asset_balance
return positions
async def symbol_info(
self,
@ -385,80 +380,9 @@ async def get_client() -> Client:
data = {
# add non-nonce and non-ofs vars
}
# positions = await client.get_positions(data)
balances = await client.get_user_data('Balance', data)
traders = await client.get_user_data('TradesHistory', data)
ledger = await client.get_ledger(data)
# positions
## TODO: Make sure to add option with fees
n, m = ledger.shape
ledger['time'] = ledger['time'].apply(lambda x: int(x))
assets = set(ledger['asset'])
trade_times = set(ledger['time'])
trades = {}
positions = {}
# for index, row in ledger.iterrows():
# if index == n:
# break
# asset = row['asset']
# ## TODO: Look into a way to generalize this
# if asset != 'ZEUR' and ledger.loc[index+1, 'asset'] == 'ZEUR':
# try:
# trades[asset]['amounts'].append(float(ledger.loc[index, 'amount']))
# trades[asset]['prices'].append(float(ledger.loc[index+1, 'amount']))
# except KeyError:
# trades[asset] = {
# 'amounts': [float(ledger.loc[index+1, 'amount'])],
# 'prices': [float(ledger.loc[index+1, 'amount'])]
# }
## TODO: Look into a way to generalize this for any fiat
## TODO: Figure out how to handle coin for coin trades
for trade_time in trade_times:
trade = ledger[ledger['time'] == trade_time]
coin = trade[trade['asset'] != 'ZEUR']
fiat = trade[trade['asset'] == 'ZEUR']
if len(coin) == 0 or len(coin) > 1:
continue
asset = list(coin.loc[:, 'asset'])[0]
amount = np.sum(coin['amount'].apply(lambda x: float(x)))
if amount > 0:
sign = -1
price = sign * np.sum(fiat['amount'].apply(lambda x: float(x)))
try:
trades[asset]['trade_amounts'].append(amount)
trades[asset]['trade_prices'].append(price)
trades[asset]['enter_amounts'].append(amount)
except KeyError:
trades[asset] = {
'trade_amounts': [amount],
'trade_prices': [price],
'enter_amounts': [amount]
}
else:
price = 0
# continue
try:
trades[asset]['trade_amounts'].append(amount)
trades[asset]['trade_prices'].append(price)
except KeyError:
trades[asset] = {
'trade_amounts': [amount],
'trade_prices': [price],
'enter_amounts': []
}
for asset in assets:
if asset == 'ZEUR':
continue
t_amounts = np.array(trades[asset]['trade_amounts'])
t_prices = np.array(trades[asset]['trade_prices'])
e_amounts = np.array(trades[asset]['enter_amounts'])
positions[asset] = np.dot(np.divide(t_prices, t_amounts), t_amounts) / np.sum(e_amounts)
await tractor.breakpoint()
# await tractor.breakpoint()
# at startup, load all symbols locally for fast search
await client.cache_symbols()
@ -466,27 +390,32 @@ async def get_client() -> Client:
yield client
# @tractor.context
# async def trades_dialogue(
# ctx: tractor.Context,
# loglevel: str = None,
# ) -> AsyncIterator[Dict[str, Any]]:
#
# # XXX: required to propagate ``tractor`` loglevel to piker logging
# get_console_log(loglevel or tractor.current_actor().loglevel)
#
# # deliver positions to subscriber before anything else
# positions = await _trio_run_client_method(method='positions')
#
# all_positions = {}
#
# for pos in positions:
# msg = pack_position(pos)
# all_positions[msg.symbol] = msg.dict()
#
# await ctx.started(all_positions)
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# deliver positions to subscriber before anything else
# positions = await _trio_run_client_method(method='positions')
global _accounts2clients
positions = await client.get_positions()
await tractor.breakpoint()
all_positions = {}
for pos in positions:
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions)
async def stream_messages(ws):