get positions from trades

kraken_orders
Konstantine Tsafatinos 2021-10-28 15:52:02 -04:00
parent 0285a847d8
commit ef598444c4
1 changed files with 57 additions and 128 deletions

View File

@ -20,7 +20,7 @@ Kraken backend.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -34,12 +34,13 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
from . import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws
from ..clearing._messages import BrokerdPosition
import urllib.parse import urllib.parse
import hashlib import hashlib
@ -229,41 +230,35 @@ class Client:
print(err) print(err)
return resp['result'] return resp['result']
async def get_ledger( async def get_positions(
self, self,
data: Dict[str, Any] data: Dict[str, Any] = {}
) -> pd.DataFrame: ) -> Dict[str, Any]:
ledgers = await self.get_user_data('Ledgers', data) balances = await self.get_user_data('Balance', data)
num_entries = int(ledgers['count']) ## TODO: grab all entries, not just first 50
crypto_transactions = np.empty((num_entries, 5), dtype=object) traders = await self.get_user_data('TradesHistory', data)
if num_entries // 50 < 0 or num_entries == 50: positions = {}
# NOTE: Omitting the following values from the kraken ledger: vols = {}
# -> 'refid', 'type', 'subtype', 'aclass'
for i, entry in enumerate(ledgers['ledger'].items()): # positions
crypto_transactions[i] = [ ## TODO: Make sure to add option to include fees in positions calc
entry[1]['time'], for trade in traders['trades'].values():
entry[1]['amount'], sign = -1 if trade['type'] == 'sell' else 1
entry[1]['fee'], try:
entry[1]['asset'], positions[trade['pair']] += sign * float(trade['cost'])
entry[1]['balance'] vols[trade['pair']] += sign * float(trade['vol'])
] except KeyError:
else: positions[trade['pair']] = sign * float(trade['cost'])
for n in range(num_entries // 50 + 1): vols[trade['pair']] = sign * float(trade['vol'])
data['ofs'] = n * 50
ledgers = await self.get_user_data('Ledgers', data) for pair in positions.keys():
for i, entry in enumerate(ledgers['ledger'].items()): asset_balance = vols[pair]
crypto_transactions[i + n * 50] = [ if asset_balance == 0:
entry[1]['time'], positions[pair] = 0
entry[1]['amount'], else:
entry[1]['fee'], positions[pair] /= asset_balance
entry[1]['asset'],
entry[1]['balance'] return positions
]
ledger = pd.DataFrame(
columns = ['time', 'amount', 'fee', 'asset', 'balance'],
data = crypto_transactions
)
return ledger
async def symbol_info( async def symbol_info(
self, self,
@ -385,80 +380,9 @@ async def get_client() -> Client:
data = { data = {
# add non-nonce and non-ofs vars # add non-nonce and non-ofs vars
} }
# positions = await client.get_positions(data)
balances = await client.get_user_data('Balance', data) # await tractor.breakpoint()
traders = await client.get_user_data('TradesHistory', data)
ledger = await client.get_ledger(data)
# positions
## TODO: Make sure to add option with fees
n, m = ledger.shape
ledger['time'] = ledger['time'].apply(lambda x: int(x))
assets = set(ledger['asset'])
trade_times = set(ledger['time'])
trades = {}
positions = {}
# for index, row in ledger.iterrows():
# if index == n:
# break
# asset = row['asset']
# ## TODO: Look into a way to generalize this
# if asset != 'ZEUR' and ledger.loc[index+1, 'asset'] == 'ZEUR':
# try:
# trades[asset]['amounts'].append(float(ledger.loc[index, 'amount']))
# trades[asset]['prices'].append(float(ledger.loc[index+1, 'amount']))
# except KeyError:
# trades[asset] = {
# 'amounts': [float(ledger.loc[index+1, 'amount'])],
# 'prices': [float(ledger.loc[index+1, 'amount'])]
# }
## TODO: Look into a way to generalize this for any fiat
## TODO: Figure out how to handle coin for coin trades
for trade_time in trade_times:
trade = ledger[ledger['time'] == trade_time]
coin = trade[trade['asset'] != 'ZEUR']
fiat = trade[trade['asset'] == 'ZEUR']
if len(coin) == 0 or len(coin) > 1:
continue
asset = list(coin.loc[:, 'asset'])[0]
amount = np.sum(coin['amount'].apply(lambda x: float(x)))
if amount > 0:
sign = -1
price = sign * np.sum(fiat['amount'].apply(lambda x: float(x)))
try:
trades[asset]['trade_amounts'].append(amount)
trades[asset]['trade_prices'].append(price)
trades[asset]['enter_amounts'].append(amount)
except KeyError:
trades[asset] = {
'trade_amounts': [amount],
'trade_prices': [price],
'enter_amounts': [amount]
}
else:
price = 0
# continue
try:
trades[asset]['trade_amounts'].append(amount)
trades[asset]['trade_prices'].append(price)
except KeyError:
trades[asset] = {
'trade_amounts': [amount],
'trade_prices': [price],
'enter_amounts': []
}
for asset in assets:
if asset == 'ZEUR':
continue
t_amounts = np.array(trades[asset]['trade_amounts'])
t_prices = np.array(trades[asset]['trade_prices'])
e_amounts = np.array(trades[asset]['enter_amounts'])
positions[asset] = np.dot(np.divide(t_prices, t_amounts), t_amounts) / np.sum(e_amounts)
await tractor.breakpoint()
# at startup, load all symbols locally for fast search # at startup, load all symbols locally for fast search
await client.cache_symbols() await client.cache_symbols()
@ -466,27 +390,32 @@ async def get_client() -> Client:
yield client yield client
# @tractor.context @tractor.context
# async def trades_dialogue( async def trades_dialogue(
# ctx: tractor.Context, ctx: tractor.Context,
# loglevel: str = None, loglevel: str = None,
# ) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
#
# # XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
# get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
#
# # deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
# positions = await _trio_run_client_method(method='positions') # positions = await _trio_run_client_method(method='positions')
#
# all_positions = {} global _accounts2clients
#
# for pos in positions: positions = await client.get_positions()
# msg = pack_position(pos)
# all_positions[msg.symbol] = msg.dict() await tractor.breakpoint()
#
# await ctx.started(all_positions)
all_positions = {}
for pos in positions:
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions)
async def stream_messages(ws): async def stream_messages(ws):