Compare commits

..

No commits in common. "628f2a6473f5f8a3fb27547f7daa66902be0dc9e" and "a2698c73b5706b0da49ef78e2f8bee5fcf2703e9" have entirely different histories.

4 changed files with 6 additions and 266 deletions

View File

@ -8,8 +8,8 @@ expires_at = 1616095326.355846
[kraken]
key_descr = "api_0"
api_key = ""
secret = ""
public_key = ""
private_key = ""
[ib]
host = "127.0.0.1"

View File

@ -20,7 +20,7 @@ Kraken backend.
"""
from contextlib import asynccontextmanager
from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
from typing import List, Dict, Any, Tuple, Optional
import time
from trio_typing import TaskStatus
@ -34,23 +34,11 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill
)
import urllib.parse
import hashlib
import hmac
import base64
import pandas as pd
log = get_logger(__name__)
@ -141,41 +129,6 @@ class OHLC:
ticks: List[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature(
urlpath: str,
data: Dict[str, Any],
secret: str
) -> str:
postdata = urllib.parse.urlencode(data)
encoded = (str(data['nonce']) + postdata).encode()
message = urlpath.encode() + hashlib.sha256(encoded).digest()
mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
sigdigest = base64.b64encode(mac.digest())
return sigdigest.decode()
class InvalidKey(ValueError):
"""EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application."""
class Client:
def __init__(self) -> None:
@ -186,9 +139,6 @@ class Client:
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._pairs: list[str] = []
self._name = ''
self._api_key = ''
self._secret = ''
@property
def pairs(self) -> Dict[str, Any]:
@ -212,71 +162,6 @@ class Client:
)
return resproc(resp, log)
async def _private(
self,
method: str,
data: dict,
uri_path: str
) -> Dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
}
resp = await self._sesh.post(
path=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
async def get_user_data(
self,
method: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time()))
resp = await self._private(method, data, uri_path)
err = resp['error']
if err:
print(err)
return resp['result']
async def get_positions(
self,
data: Dict[str, Any] = {}
) -> Dict[str, Any]:
balances = await self.get_user_data('Balance', data)
## TODO: grab all entries, not just first 50
traders = await self.get_user_data('TradesHistory', data)
positions = {}
vols = {}
# positions
## TODO: Make sure to add option to include fees in positions calc
for trade in traders['trades'].values():
sign = -1 if trade['type'] == 'sell' else 1
try:
positions[trade['pair']] += sign * float(trade['cost'])
vols[trade['pair']] += sign * float(trade['vol'])
except KeyError:
positions[trade['pair']] = sign * float(trade['cost'])
vols[trade['pair']] = sign * float(trade['vol'])
for pair in positions.keys():
asset_balance = vols[pair]
if asset_balance == 0:
positions[pair] = 0
else:
positions[pair] /= asset_balance
return positions, vols
async def symbol_info(
self,
pair: Optional[str] = None,
@ -390,155 +275,12 @@ class Client:
async def get_client() -> Client:
client = Client()
## TODO: maybe add conditional based on section
section = get_config()
client._name = section['key_descr']
client._api_key = section['api_key']
client._secret = section['secret']
## TODO: Add a client attribute to hold this info
#data = {
# # add non-nonce and non-ofs vars
#}
# at startup, load all symbols locally for fast search
await client.cache_symbols()
yield client
def pack_position(
acc: str,
symkey: str,
pos: float,
vol: float
) -> dict[str, Any]:
return BrokerdPosition(
broker='kraken',
account=acc,
symbol=symkey,
currency=symkey[-3:],
size=float(vol),
avg_price=float(pos),
)
def normalize_symbol(
ticker: str
) -> str:
symlen = len(ticker)
if symlen == 6:
return ticker.lower()
else:
for sym in ['XXBT', 'XXMR', 'ZEUR']:
if sym in ticker:
ticker = ticker.replace(sym, sym[1:])
return ticker.lower()
async def handle_order_requests(
client: #kraken,
ems_order_stream: tractor.MsgStream,
) -> None:
# order_request: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerError(
oid=request_msg['oid']
symbol=request_msg['symbol']
reason=f'Kraken only, No account found: `{account}` ?',
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
## TODO: look into the submit_limit method, do it write my own?
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# Authenticated block
async with get_client() as client:
acc_name = 'kraken.' + client._name
positions, vols = await client.get_positions()
all_positions = []
for ticker, pos in positions.items():
norm_sym = normalize_symbol(ticker)
if float(vols[ticker]) != 0:
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict())
open_orders = await client.get_user_data('OpenOrders', {})
await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,)))
await trio.sleep_forever()
# async with (
# ctx.open_stream() as ems_stream,
# trio.open_nursery as n,
# ):
async def stream_messages(ws):
too_slow_count = last_hb = 0

View File

@ -489,8 +489,7 @@ async def open_brokerd_trades_dialogue(
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
## TODO: Maybe add a warning
_router.relays.pop(broker, None)
_router.relays.pop(broker)
@tractor.context

View File

@ -389,7 +389,7 @@ async def handle_order_requests(
account = request_msg['account']
if account != 'paper':
log.error(
'This is a paper account, only a `paper` selection is valid'
'On a paper account, only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
@ -463,8 +463,7 @@ async def trades_dialogue(
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started(({}, {'paper',}))
await ctx.started(({}, ['paper']))
async with (
ctx.open_stream() as ems_stream,