Compare commits
	
		
			9 Commits 
		
	
	
		
			310_plus
			...
			gb_kraken_
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						628f2a6473 | |
| 
							
							
								 | 
						8b7f605416 | |
| 
							
							
								 | 
						ca8ef26ea5 | |
| 
							
							
								 | 
						4a3515541d | |
| 
							
							
								 | 
						ecd53459f6 | |
| 
							
							
								 | 
						e5a3b8643f | |
| 
							
							
								 | 
						369fd45c8e | |
| 
							
							
								 | 
						3ac48656a9 | |
| 
							
							
								 | 
						0c537a67a8 | 
| 
						 | 
				
			
			@ -8,8 +8,8 @@ expires_at = 1616095326.355846
 | 
			
		|||
 | 
			
		||||
[kraken]
 | 
			
		||||
key_descr = "api_0"
 | 
			
		||||
public_key = ""
 | 
			
		||||
private_key = ""
 | 
			
		||||
api_key = ""
 | 
			
		||||
secret = ""
 | 
			
		||||
 | 
			
		||||
[ib]
 | 
			
		||||
host = "127.0.0.1"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,7 +20,7 @@ Kraken backend.
 | 
			
		|||
"""
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from dataclasses import asdict, field
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Optional
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
| 
						 | 
				
			
			@ -34,11 +34,23 @@ from pydantic.dataclasses import dataclass
 | 
			
		|||
from pydantic import BaseModel
 | 
			
		||||
import wsproto
 | 
			
		||||
 | 
			
		||||
from .. import config
 | 
			
		||||
from .._cacheables import open_cached_client
 | 
			
		||||
from ._util import resproc, SymbolNotFound, BrokerError
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from ..data import ShmArray
 | 
			
		||||
from ..data._web_bs import open_autorecon_ws
 | 
			
		||||
from ..clearing._messages import (
 | 
			
		||||
    BrokerdPosition, BrokerdOrder, BrokerdStatus,
 | 
			
		||||
    BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import urllib.parse
 | 
			
		||||
import hashlib
 | 
			
		||||
import hmac
 | 
			
		||||
import base64
 | 
			
		||||
import pandas as pd
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -129,6 +141,41 @@ class OHLC:
 | 
			
		|||
    ticks: List[Any] = field(default_factory=list)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_config() -> dict[str, Any]:
 | 
			
		||||
 | 
			
		||||
    conf, path = config.load()
 | 
			
		||||
 | 
			
		||||
    section = conf.get('kraken')
 | 
			
		||||
 | 
			
		||||
    if section is None:
 | 
			
		||||
        log.warning(f'No config section found for kraken in {path}')
 | 
			
		||||
        return {}
 | 
			
		||||
 | 
			
		||||
    return section
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_kraken_signature(
 | 
			
		||||
    urlpath: str,
 | 
			
		||||
    data: Dict[str, Any], 
 | 
			
		||||
    secret: str
 | 
			
		||||
) -> str:
 | 
			
		||||
    postdata = urllib.parse.urlencode(data)
 | 
			
		||||
    encoded = (str(data['nonce']) + postdata).encode()
 | 
			
		||||
    message = urlpath.encode() + hashlib.sha256(encoded).digest()
 | 
			
		||||
 | 
			
		||||
    mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
 | 
			
		||||
    sigdigest = base64.b64encode(mac.digest())
 | 
			
		||||
    return sigdigest.decode()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InvalidKey(ValueError):
 | 
			
		||||
    """EAPI:Invalid key
 | 
			
		||||
        This error is returned when the API key used for the call is 
 | 
			
		||||
        either expired or disabled, please review the API key in your 
 | 
			
		||||
        Settings -> API tab of account management or generate a new one 
 | 
			
		||||
        and update your application."""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Client:
 | 
			
		||||
 | 
			
		||||
    def __init__(self) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -139,6 +186,9 @@ class Client:
 | 
			
		|||
                'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
 | 
			
		||||
        })
 | 
			
		||||
        self._pairs: list[str] = []
 | 
			
		||||
        self._name = ''
 | 
			
		||||
        self._api_key = ''
 | 
			
		||||
        self._secret = ''
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def pairs(self) -> Dict[str, Any]:
 | 
			
		||||
| 
						 | 
				
			
			@ -162,6 +212,71 @@ class Client:
 | 
			
		|||
        )
 | 
			
		||||
        return resproc(resp, log)
 | 
			
		||||
 | 
			
		||||
    async def _private(
 | 
			
		||||
        self,
 | 
			
		||||
        method: str,
 | 
			
		||||
        data: dict,
 | 
			
		||||
        uri_path: str
 | 
			
		||||
    ) -> Dict[str, Any]:
 | 
			
		||||
        headers = {
 | 
			
		||||
            'Content-Type': 
 | 
			
		||||
                'application/x-www-form-urlencoded',
 | 
			
		||||
            'API-Key':
 | 
			
		||||
                self._api_key,
 | 
			
		||||
            'API-Sign':
 | 
			
		||||
                get_kraken_signature(uri_path, data, self._secret)
 | 
			
		||||
        }
 | 
			
		||||
        resp = await self._sesh.post(
 | 
			
		||||
            path=f'/private/{method}',
 | 
			
		||||
            data=data,
 | 
			
		||||
            headers=headers,
 | 
			
		||||
            timeout=float('inf')
 | 
			
		||||
        )
 | 
			
		||||
        return resproc(resp, log)
 | 
			
		||||
 | 
			
		||||
    async def get_user_data(
 | 
			
		||||
        self,
 | 
			
		||||
        method: str,
 | 
			
		||||
        data: Dict[str, Any]
 | 
			
		||||
    ) -> Dict[str, Any]:
 | 
			
		||||
        uri_path = f'/0/private/{method}'
 | 
			
		||||
        data['nonce'] = str(int(1000*time.time()))
 | 
			
		||||
        resp = await self._private(method, data, uri_path)
 | 
			
		||||
        err = resp['error']
 | 
			
		||||
        if err:
 | 
			
		||||
            print(err)
 | 
			
		||||
        return resp['result']
 | 
			
		||||
 | 
			
		||||
    async def get_positions(
 | 
			
		||||
        self,
 | 
			
		||||
        data: Dict[str, Any] = {}
 | 
			
		||||
    ) -> Dict[str, Any]:
 | 
			
		||||
        balances = await self.get_user_data('Balance', data)
 | 
			
		||||
        ## TODO: grab all entries, not just first 50
 | 
			
		||||
        traders = await self.get_user_data('TradesHistory', data)
 | 
			
		||||
        positions = {}
 | 
			
		||||
        vols = {}
 | 
			
		||||
        
 | 
			
		||||
        # positions
 | 
			
		||||
        ## TODO: Make sure to add option to include fees in positions calc
 | 
			
		||||
        for trade in traders['trades'].values():
 | 
			
		||||
            sign = -1 if trade['type'] == 'sell' else 1
 | 
			
		||||
            try:
 | 
			
		||||
                positions[trade['pair']] += sign * float(trade['cost'])
 | 
			
		||||
                vols[trade['pair']] += sign * float(trade['vol'])
 | 
			
		||||
            except KeyError:
 | 
			
		||||
                positions[trade['pair']] = sign * float(trade['cost'])
 | 
			
		||||
                vols[trade['pair']] = sign * float(trade['vol'])
 | 
			
		||||
    
 | 
			
		||||
        for pair in positions.keys():
 | 
			
		||||
            asset_balance = vols[pair]
 | 
			
		||||
            if asset_balance == 0:
 | 
			
		||||
                positions[pair] = 0
 | 
			
		||||
            else:
 | 
			
		||||
                positions[pair] /= asset_balance
 | 
			
		||||
 | 
			
		||||
        return positions, vols
 | 
			
		||||
 | 
			
		||||
    async def symbol_info(
 | 
			
		||||
        self,
 | 
			
		||||
        pair: Optional[str] = None,
 | 
			
		||||
| 
						 | 
				
			
			@ -275,12 +390,155 @@ class Client:
 | 
			
		|||
async def get_client() -> Client:
 | 
			
		||||
    client = Client()
 | 
			
		||||
 | 
			
		||||
    ## TODO: maybe add conditional based on section
 | 
			
		||||
    section = get_config()
 | 
			
		||||
    client._name = section['key_descr']
 | 
			
		||||
    client._api_key = section['api_key']
 | 
			
		||||
    client._secret = section['secret']
 | 
			
		||||
    ## TODO: Add a client attribute to hold this info
 | 
			
		||||
    #data = {
 | 
			
		||||
    #    # add non-nonce and non-ofs vars
 | 
			
		||||
    #}
 | 
			
		||||
 | 
			
		||||
    # at startup, load all symbols locally for fast search
 | 
			
		||||
    await client.cache_symbols()
 | 
			
		||||
 | 
			
		||||
    yield client
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pack_position(
 | 
			
		||||
    acc: str,
 | 
			
		||||
    symkey: str,
 | 
			
		||||
    pos: float,
 | 
			
		||||
    vol: float
 | 
			
		||||
) -> dict[str, Any]:
 | 
			
		||||
 | 
			
		||||
    return BrokerdPosition(
 | 
			
		||||
        broker='kraken',
 | 
			
		||||
        account=acc,
 | 
			
		||||
        symbol=symkey,
 | 
			
		||||
        currency=symkey[-3:],
 | 
			
		||||
        size=float(vol),
 | 
			
		||||
        avg_price=float(pos),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def normalize_symbol(
 | 
			
		||||
    ticker: str
 | 
			
		||||
) -> str:
 | 
			
		||||
    symlen = len(ticker)
 | 
			
		||||
    if symlen == 6:
 | 
			
		||||
        return ticker.lower()
 | 
			
		||||
    else:
 | 
			
		||||
        for sym in ['XXBT', 'XXMR', 'ZEUR']:
 | 
			
		||||
            if sym in ticker:
 | 
			
		||||
                ticker = ticker.replace(sym, sym[1:])
 | 
			
		||||
        return ticker.lower()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def handle_order_requests(
 | 
			
		||||
 | 
			
		||||
        client: #kraken,
 | 
			
		||||
        ems_order_stream: tractor.MsgStream,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    # order_request: dict
 | 
			
		||||
    async for request_msg in ems_order_stream:
 | 
			
		||||
        log.info(f'Received order request {request_msg}')
 | 
			
		||||
 | 
			
		||||
        action = request_msg['action']
 | 
			
		||||
 | 
			
		||||
        if action in {'buy', 'sell'}:
 | 
			
		||||
 | 
			
		||||
            account = request_msg['account']
 | 
			
		||||
            if account != 'kraken.spot':
 | 
			
		||||
                log.error(
 | 
			
		||||
                    'This is a kraken account, \
 | 
			
		||||
                    only a `kraken.spot` selection is valid'
 | 
			
		||||
                )
 | 
			
		||||
                await ems_order_stream.send(BrokerError(
 | 
			
		||||
                    oid=request_msg['oid']
 | 
			
		||||
                    symbol=request_msg['symbol']
 | 
			
		||||
                    reason=f'Kraken only, No account found: `{account}` ?',
 | 
			
		||||
                ).dict())
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            # validate
 | 
			
		||||
            order = BrokerdOrder(**request_msg)
 | 
			
		||||
 | 
			
		||||
            # call our client api to submit the order
 | 
			
		||||
            ## TODO: look into the submit_limit method, do it write my own?
 | 
			
		||||
            reqid = await client.submit_limit(
 | 
			
		||||
 | 
			
		||||
                oid=order.oid,
 | 
			
		||||
                symbol=order.symbol,
 | 
			
		||||
                price=order.price,
 | 
			
		||||
                action=order.action,
 | 
			
		||||
                size=order.size,
 | 
			
		||||
                ## XXX: how do I handle new orders
 | 
			
		||||
                reqid=order.reqid,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # deliver ack that order has been submitted to broker routing
 | 
			
		||||
            await ems_order_stream.send(
 | 
			
		||||
                BrokerdOrderAck(
 | 
			
		||||
 | 
			
		||||
                    # ems order request id
 | 
			
		||||
                    oid=order.oid,
 | 
			
		||||
 | 
			
		||||
                    # broker specific request id
 | 
			
		||||
                    reqid=reqid,
 | 
			
		||||
 | 
			
		||||
                ).dict()
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        elif action == 'cancel':
 | 
			
		||||
             msg = BrokerdCancel(**request_msg)
 | 
			
		||||
 | 
			
		||||
             await client.submit_cancel(
 | 
			
		||||
                reqid=msg.reqid
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            log.error(f'Unknown order command: {request_msg}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def trades_dialogue(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
) -> AsyncIterator[Dict[str, Any]]:
 | 
			
		||||
 | 
			
		||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
 | 
			
		||||
    # Authenticated block
 | 
			
		||||
    async with get_client() as client:
 | 
			
		||||
        acc_name = 'kraken.' + client._name
 | 
			
		||||
        positions, vols = await client.get_positions()
 | 
			
		||||
 | 
			
		||||
        all_positions = []
 | 
			
		||||
 | 
			
		||||
        for ticker, pos in positions.items():
 | 
			
		||||
            norm_sym = normalize_symbol(ticker)
 | 
			
		||||
            if float(vols[ticker]) != 0:
 | 
			
		||||
                msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
 | 
			
		||||
                all_positions.append(msg.dict())
 | 
			
		||||
        
 | 
			
		||||
        open_orders = await client.get_user_data('OpenOrders', {})
 | 
			
		||||
        await tractor.breakpoint()
 | 
			
		||||
 | 
			
		||||
        await ctx.started((all_positions, (acc_name,)))
 | 
			
		||||
 | 
			
		||||
        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
       # async with (
 | 
			
		||||
       #     ctx.open_stream() as ems_stream,
 | 
			
		||||
       #     trio.open_nursery as n,
 | 
			
		||||
       # ):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_messages(ws):
 | 
			
		||||
 | 
			
		||||
    too_slow_count = last_hb = 0
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -489,7 +489,8 @@ async def open_brokerd_trades_dialogue(
 | 
			
		|||
        finally:
 | 
			
		||||
            # parent context must have been closed
 | 
			
		||||
            # remove from cache so next client will respawn if needed
 | 
			
		||||
            _router.relays.pop(broker)
 | 
			
		||||
            ## TODO: Maybe add a warning
 | 
			
		||||
            _router.relays.pop(broker, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -389,7 +389,7 @@ async def handle_order_requests(
 | 
			
		|||
            account = request_msg['account']
 | 
			
		||||
            if account != 'paper':
 | 
			
		||||
                log.error(
 | 
			
		||||
                    'On a paper account, only a `paper` selection is valid'
 | 
			
		||||
                    'This is a paper account, only a `paper` selection is valid'
 | 
			
		||||
                )
 | 
			
		||||
                await ems_order_stream.send(BrokerdError(
 | 
			
		||||
                    oid=request_msg['oid'],
 | 
			
		||||
| 
						 | 
				
			
			@ -463,7 +463,8 @@ async def trades_dialogue(
 | 
			
		|||
    ):
 | 
			
		||||
        # TODO: load paper positions per broker from .toml config file
 | 
			
		||||
        # and pass as symbol to position data mapping: ``dict[str, dict]``
 | 
			
		||||
        await ctx.started(({}, ['paper']))
 | 
			
		||||
        # await ctx.started(all_positions)
 | 
			
		||||
        await ctx.started(({}, {'paper',}))
 | 
			
		||||
 | 
			
		||||
        async with (
 | 
			
		||||
            ctx.open_stream() as ems_stream,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue