Compare commits

...

9 Commits

Author SHA1 Message Date
Konstantine Tsafatinos 628f2a6473 added the bones for the handle_order_requests func 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos 8b7f605416 save progress on kraken to test out unit_select_fixes 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos ca8ef26ea5 get positions working for kraken 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos 4a3515541d get positions from trades 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos ecd53459f6 Store changes for rebase, positions prototype 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos e5a3b8643f Add balance to the ledger 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos 369fd45c8e Add get_ledger function; parses raw ledger from kraken api 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos 3ac48656a9 wrap api method calls with uri and nonce value 2022-01-30 15:50:36 -05:00
Konstantine Tsafatinos 0c537a67a8 get kraken authentication and retrieve balances 2022-01-30 15:50:36 -05:00
4 changed files with 266 additions and 6 deletions

View File

@ -8,8 +8,8 @@ expires_at = 1616095326.355846
[kraken] [kraken]
key_descr = "api_0" key_descr = "api_0"
public_key = "" api_key = ""
private_key = "" secret = ""
[ib] [ib]
host = "127.0.0.1" host = "127.0.0.1"

View File

@ -20,7 +20,7 @@ Kraken backend.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -34,11 +34,23 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws
from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill
)
import urllib.parse
import hashlib
import hmac
import base64
import pandas as pd
log = get_logger(__name__) log = get_logger(__name__)
@ -129,6 +141,41 @@ class OHLC:
ticks: List[Any] = field(default_factory=list) ticks: List[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature(
urlpath: str,
data: Dict[str, Any],
secret: str
) -> str:
postdata = urllib.parse.urlencode(data)
encoded = (str(data['nonce']) + postdata).encode()
message = urlpath.encode() + hashlib.sha256(encoded).digest()
mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
sigdigest = base64.b64encode(mac.digest())
return sigdigest.decode()
class InvalidKey(ValueError):
"""EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application."""
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
@ -139,6 +186,9 @@ class Client:
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}) })
self._pairs: list[str] = [] self._pairs: list[str] = []
self._name = ''
self._api_key = ''
self._secret = ''
@property @property
def pairs(self) -> Dict[str, Any]: def pairs(self) -> Dict[str, Any]:
@ -162,6 +212,71 @@ class Client:
) )
return resproc(resp, log) return resproc(resp, log)
async def _private(
self,
method: str,
data: dict,
uri_path: str
) -> Dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
}
resp = await self._sesh.post(
path=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
async def get_user_data(
self,
method: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time()))
resp = await self._private(method, data, uri_path)
err = resp['error']
if err:
print(err)
return resp['result']
async def get_positions(
self,
data: Dict[str, Any] = {}
) -> Dict[str, Any]:
balances = await self.get_user_data('Balance', data)
## TODO: grab all entries, not just first 50
traders = await self.get_user_data('TradesHistory', data)
positions = {}
vols = {}
# positions
## TODO: Make sure to add option to include fees in positions calc
for trade in traders['trades'].values():
sign = -1 if trade['type'] == 'sell' else 1
try:
positions[trade['pair']] += sign * float(trade['cost'])
vols[trade['pair']] += sign * float(trade['vol'])
except KeyError:
positions[trade['pair']] = sign * float(trade['cost'])
vols[trade['pair']] = sign * float(trade['vol'])
for pair in positions.keys():
asset_balance = vols[pair]
if asset_balance == 0:
positions[pair] = 0
else:
positions[pair] /= asset_balance
return positions, vols
async def symbol_info( async def symbol_info(
self, self,
pair: Optional[str] = None, pair: Optional[str] = None,
@ -275,12 +390,155 @@ class Client:
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
## TODO: maybe add conditional based on section
section = get_config()
client._name = section['key_descr']
client._api_key = section['api_key']
client._secret = section['secret']
## TODO: Add a client attribute to hold this info
#data = {
# # add non-nonce and non-ofs vars
#}
# at startup, load all symbols locally for fast search # at startup, load all symbols locally for fast search
await client.cache_symbols() await client.cache_symbols()
yield client yield client
def pack_position(
acc: str,
symkey: str,
pos: float,
vol: float
) -> dict[str, Any]:
return BrokerdPosition(
broker='kraken',
account=acc,
symbol=symkey,
currency=symkey[-3:],
size=float(vol),
avg_price=float(pos),
)
def normalize_symbol(
ticker: str
) -> str:
symlen = len(ticker)
if symlen == 6:
return ticker.lower()
else:
for sym in ['XXBT', 'XXMR', 'ZEUR']:
if sym in ticker:
ticker = ticker.replace(sym, sym[1:])
return ticker.lower()
async def handle_order_requests(
client: #kraken,
ems_order_stream: tractor.MsgStream,
) -> None:
# order_request: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerError(
oid=request_msg['oid']
symbol=request_msg['symbol']
reason=f'Kraken only, No account found: `{account}` ?',
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
## TODO: look into the submit_limit method, do it write my own?
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# Authenticated block
async with get_client() as client:
acc_name = 'kraken.' + client._name
positions, vols = await client.get_positions()
all_positions = []
for ticker, pos in positions.items():
norm_sym = normalize_symbol(ticker)
if float(vols[ticker]) != 0:
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict())
open_orders = await client.get_user_data('OpenOrders', {})
await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,)))
await trio.sleep_forever()
# async with (
# ctx.open_stream() as ems_stream,
# trio.open_nursery as n,
# ):
async def stream_messages(ws): async def stream_messages(ws):
too_slow_count = last_hb = 0 too_slow_count = last_hb = 0

View File

@ -489,7 +489,8 @@ async def open_brokerd_trades_dialogue(
finally: finally:
# parent context must have been closed # parent context must have been closed
# remove from cache so next client will respawn if needed # remove from cache so next client will respawn if needed
_router.relays.pop(broker) ## TODO: Maybe add a warning
_router.relays.pop(broker, None)
@tractor.context @tractor.context

View File

@ -389,7 +389,7 @@ async def handle_order_requests(
account = request_msg['account'] account = request_msg['account']
if account != 'paper': if account != 'paper':
log.error( log.error(
'On a paper account, only a `paper` selection is valid' 'This is a paper account, only a `paper` selection is valid'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], oid=request_msg['oid'],
@ -463,7 +463,8 @@ async def trades_dialogue(
): ):
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started(({}, ['paper'])) # await ctx.started(all_positions)
await ctx.started(({}, {'paper',}))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,