commit
						d26fea70c7
					
				| 
						 | 
				
			
			@ -8,8 +8,8 @@ expires_at = 1616095326.355846
 | 
			
		|||
 | 
			
		||||
[kraken]
 | 
			
		||||
key_descr = "api_0"
 | 
			
		||||
public_key = ""
 | 
			
		||||
private_key = ""
 | 
			
		||||
api_key = ""
 | 
			
		||||
secret = ""
 | 
			
		||||
 | 
			
		||||
[ib]
 | 
			
		||||
host = "127.0.0.1"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,13 +14,13 @@
 | 
			
		|||
# You should have received a copy of the GNU Affero General Public License
 | 
			
		||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
'''
 | 
			
		||||
Kraken backend.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
'''
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from dataclasses import asdict, field
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Optional
 | 
			
		||||
from typing import Dict, List, Tuple, Any, Optional, AsyncIterator
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
| 
						 | 
				
			
			@ -33,12 +33,25 @@ import tractor
 | 
			
		|||
from pydantic.dataclasses import dataclass
 | 
			
		||||
from pydantic import BaseModel
 | 
			
		||||
import wsproto
 | 
			
		||||
from itertools import count
 | 
			
		||||
import urllib.parse
 | 
			
		||||
import hashlib
 | 
			
		||||
import hmac
 | 
			
		||||
import base64
 | 
			
		||||
 | 
			
		||||
from .. import config
 | 
			
		||||
from .._cacheables import open_cached_client
 | 
			
		||||
from ._util import resproc, SymbolNotFound, BrokerError
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from ..data import ShmArray
 | 
			
		||||
from ..data._web_bs import open_autorecon_ws
 | 
			
		||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
 | 
			
		||||
from ..clearing._paper_engine import PaperBoi
 | 
			
		||||
from ..clearing._messages import (
 | 
			
		||||
    BrokerdPosition, BrokerdOrder, BrokerdStatus,
 | 
			
		||||
    BrokerdOrderAck, BrokerdError, BrokerdCancel,
 | 
			
		||||
    BrokerdFill,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -106,13 +119,27 @@ class Pair(BaseModel):
 | 
			
		|||
    ordermin: float  # minimum order volume for pair
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Trade(BaseModel):
 | 
			
		||||
    '''
 | 
			
		||||
    Trade class that helps parse and validate ownTrades stream
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    reqid: str # kraken order transaction id
 | 
			
		||||
    action: str # buy or sell
 | 
			
		||||
    price: str # price of asset
 | 
			
		||||
    size: str # vol of asset
 | 
			
		||||
    broker_time: str # e.g GTC, GTD
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
class OHLC:
 | 
			
		||||
    """Description of the flattened OHLC quote format.
 | 
			
		||||
    '''
 | 
			
		||||
    Description of the flattened OHLC quote format.
 | 
			
		||||
 | 
			
		||||
    For schema details see:
 | 
			
		||||
        https://docs.kraken.com/websockets/#message-ohlc
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    chan_id: int  # internal kraken id
 | 
			
		||||
    chan_name: str  # eg. ohlc-1  (name-interval)
 | 
			
		||||
    pair: str  # fx pair
 | 
			
		||||
| 
						 | 
				
			
			@ -129,9 +156,52 @@ class OHLC:
 | 
			
		|||
    ticks: List[Any] = field(default_factory=list)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_config() -> dict[str, Any]:
 | 
			
		||||
 | 
			
		||||
    conf, path = config.load()
 | 
			
		||||
 | 
			
		||||
    section = conf.get('kraken')
 | 
			
		||||
 | 
			
		||||
    if section is None:
 | 
			
		||||
        log.warning(f'No config section found for kraken in {path}')
 | 
			
		||||
        return {}
 | 
			
		||||
 | 
			
		||||
    return section
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_kraken_signature(
 | 
			
		||||
    urlpath: str,
 | 
			
		||||
    data: Dict[str, Any],
 | 
			
		||||
    secret: str
 | 
			
		||||
) -> str:
 | 
			
		||||
    postdata = urllib.parse.urlencode(data)
 | 
			
		||||
    encoded = (str(data['nonce']) + postdata).encode()
 | 
			
		||||
    message = urlpath.encode() + hashlib.sha256(encoded).digest()
 | 
			
		||||
 | 
			
		||||
    mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
 | 
			
		||||
    sigdigest = base64.b64encode(mac.digest())
 | 
			
		||||
    return sigdigest.decode()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InvalidKey(ValueError):
 | 
			
		||||
    '''
 | 
			
		||||
    EAPI:Invalid key
 | 
			
		||||
    This error is returned when the API key used for the call is
 | 
			
		||||
    either expired or disabled, please review the API key in your
 | 
			
		||||
    Settings -> API tab of account management or generate a new one
 | 
			
		||||
    and update your application.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Client:
 | 
			
		||||
 | 
			
		||||
    def __init__(self) -> None:
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str = '',
 | 
			
		||||
        api_key: str = '',
 | 
			
		||||
        secret: str = ''
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self._sesh = asks.Session(connections=4)
 | 
			
		||||
        self._sesh.base_location = _url
 | 
			
		||||
        self._sesh.headers.update({
 | 
			
		||||
| 
						 | 
				
			
			@ -139,6 +209,9 @@ class Client:
 | 
			
		|||
                'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
 | 
			
		||||
        })
 | 
			
		||||
        self._pairs: list[str] = []
 | 
			
		||||
        self._name = name
 | 
			
		||||
        self._api_key = api_key
 | 
			
		||||
        self._secret = secret
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def pairs(self) -> Dict[str, Any]:
 | 
			
		||||
| 
						 | 
				
			
			@ -162,6 +235,108 @@ class Client:
 | 
			
		|||
        )
 | 
			
		||||
        return resproc(resp, log)
 | 
			
		||||
 | 
			
		||||
    async def _private(
 | 
			
		||||
        self,
 | 
			
		||||
        method: str,
 | 
			
		||||
        data: dict,
 | 
			
		||||
        uri_path: str
 | 
			
		||||
    ) -> Dict[str, Any]:
 | 
			
		||||
        headers = {
 | 
			
		||||
            'Content-Type': 
 | 
			
		||||
                'application/x-www-form-urlencoded',
 | 
			
		||||
            'API-Key':
 | 
			
		||||
                self._api_key,
 | 
			
		||||
            'API-Sign':
 | 
			
		||||
                get_kraken_signature(uri_path, data, self._secret)
 | 
			
		||||
        }
 | 
			
		||||
        resp = await self._sesh.post(
 | 
			
		||||
            path=f'/private/{method}',
 | 
			
		||||
            data=data,
 | 
			
		||||
            headers=headers,
 | 
			
		||||
            timeout=float('inf')
 | 
			
		||||
        )
 | 
			
		||||
        return resproc(resp, log)
 | 
			
		||||
 | 
			
		||||
    async def endpoint(
 | 
			
		||||
        self,
 | 
			
		||||
        method: str,
 | 
			
		||||
        data: Dict[str, Any]
 | 
			
		||||
    ) -> Dict[str, Any]:
 | 
			
		||||
        uri_path = f'/0/private/{method}'
 | 
			
		||||
        data['nonce'] = str(int(1000*time.time()))
 | 
			
		||||
        return await self._private(method, data, uri_path)
 | 
			
		||||
 | 
			
		||||
    async def get_trades(
 | 
			
		||||
        self,
 | 
			
		||||
        data: Dict[str, Any] = {}
 | 
			
		||||
    ) -> Dict[str, Any]:
 | 
			
		||||
        data['ofs'] = 0
 | 
			
		||||
        # Grab all trade history
 | 
			
		||||
        # https://docs.kraken.com/rest/#operation/getTradeHistory
 | 
			
		||||
        # Kraken uses 'ofs' to refer to the offset
 | 
			
		||||
        while True:
 | 
			
		||||
            resp = await self.endpoint('TradesHistory', data)
 | 
			
		||||
            # grab the first 50 trades
 | 
			
		||||
            if data['ofs'] == 0:
 | 
			
		||||
                trades = resp['result']['trades']
 | 
			
		||||
            # load the next 50 trades using dict constructor
 | 
			
		||||
            # for speed
 | 
			
		||||
            elif data['ofs'] == 50:
 | 
			
		||||
                trades = dict(trades, **resp['result']['trades'])
 | 
			
		||||
            # catch the end of the trades
 | 
			
		||||
            elif resp['result']['trades'] == {}:
 | 
			
		||||
                count = resp['result']['count']
 | 
			
		||||
                break
 | 
			
		||||
            # update existing dict if num trades exceeds 100
 | 
			
		||||
            else: 
 | 
			
		||||
                trades.update(resp['result']['trades'])
 | 
			
		||||
            # increment the offset counter
 | 
			
		||||
            data['ofs'] += 50
 | 
			
		||||
            # To avoid exceeding API rate limit in case of a lot of trades
 | 
			
		||||
            await trio.sleep(1)
 | 
			
		||||
 | 
			
		||||
        # make sure you grabbed all the trades
 | 
			
		||||
        assert count == len(trades.values())
 | 
			
		||||
 | 
			
		||||
        return trades
 | 
			
		||||
 | 
			
		||||
    async def submit_limit(
 | 
			
		||||
        self,
 | 
			
		||||
        oid: str,
 | 
			
		||||
        symbol: str,
 | 
			
		||||
        price: float,
 | 
			
		||||
        action: str,
 | 
			
		||||
        size: float,
 | 
			
		||||
        reqid: int = None,
 | 
			
		||||
    ) -> int:
 | 
			
		||||
        '''
 | 
			
		||||
        Place an order and return integer request id provided by client.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        # Build order data for kraken api
 | 
			
		||||
        data = {
 | 
			
		||||
            "userref": reqid,
 | 
			
		||||
            "ordertype": "limit",
 | 
			
		||||
            "type": action,
 | 
			
		||||
            "volume": str(size),
 | 
			
		||||
            "pair": symbol,
 | 
			
		||||
            "price": str(price),
 | 
			
		||||
            # set to True test AddOrder call without a real submission
 | 
			
		||||
            "validate": False
 | 
			
		||||
        }
 | 
			
		||||
        return await self.endpoint('AddOrder', data)
 | 
			
		||||
 | 
			
		||||
    async def submit_cancel(
 | 
			
		||||
        self,
 | 
			
		||||
        reqid: str,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Send cancel request for order id ``reqid``.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        # txid is a transaction id given by kraken
 | 
			
		||||
        return await self.endpoint('CancelOrder', {"txid": reqid})
 | 
			
		||||
 | 
			
		||||
    async def symbol_info(
 | 
			
		||||
        self,
 | 
			
		||||
        pair: Optional[str] = None,
 | 
			
		||||
| 
						 | 
				
			
			@ -271,9 +446,19 @@ class Client:
 | 
			
		|||
            raise SymbolNotFound(json['error'][0] + f': {symbol}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@asynccontextmanager
 | 
			
		||||
async def get_client() -> Client:
 | 
			
		||||
    client = Client()
 | 
			
		||||
 | 
			
		||||
    section = get_config()
 | 
			
		||||
    if section: 
 | 
			
		||||
        client = Client(
 | 
			
		||||
            name=section['key_descr'],
 | 
			
		||||
            api_key=section['api_key'],
 | 
			
		||||
            secret=section['secret']
 | 
			
		||||
        )
 | 
			
		||||
    else:
 | 
			
		||||
        client = Client()
 | 
			
		||||
 | 
			
		||||
    # at startup, load all symbols locally for fast search
 | 
			
		||||
    await client.cache_symbols()
 | 
			
		||||
| 
						 | 
				
			
			@ -281,8 +466,347 @@ async def get_client() -> Client:
 | 
			
		|||
    yield client
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_messages(ws):
 | 
			
		||||
def pack_positions(
 | 
			
		||||
    acc: str,
 | 
			
		||||
    trades: dict
 | 
			
		||||
) -> list[Any]:
 | 
			
		||||
    positions: dict[str, float] = {}
 | 
			
		||||
    vols: dict[str, float] = {}
 | 
			
		||||
    costs: dict[str, float] = {}
 | 
			
		||||
    position_msgs: list[Any] = []
 | 
			
		||||
 | 
			
		||||
    for trade in trades.values():
 | 
			
		||||
        sign = -1 if trade['type'] == 'sell' else 1
 | 
			
		||||
        pair = trade['pair']
 | 
			
		||||
        vol = float(trade['vol'])
 | 
			
		||||
        vols[pair] = vols.get(pair, 0) + sign * vol
 | 
			
		||||
        costs[pair] = costs.get(pair, 0) + sign * float(trade['cost'])
 | 
			
		||||
        positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0
 | 
			
		||||
 | 
			
		||||
    for ticker, pos in positions.items():
 | 
			
		||||
        vol = float(vols[ticker])
 | 
			
		||||
        if not vol:
 | 
			
		||||
            continue
 | 
			
		||||
        norm_sym = normalize_symbol(ticker)
 | 
			
		||||
        msg = BrokerdPosition(
 | 
			
		||||
            broker='kraken',
 | 
			
		||||
            account=acc,
 | 
			
		||||
            symbol=norm_sym,
 | 
			
		||||
            currency=norm_sym[-3:],
 | 
			
		||||
            size=vol,
 | 
			
		||||
            avg_price=float(pos),
 | 
			
		||||
        )
 | 
			
		||||
        position_msgs.append(msg.dict())
 | 
			
		||||
 | 
			
		||||
    return position_msgs
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def normalize_symbol(
 | 
			
		||||
    ticker: str
 | 
			
		||||
) -> str:
 | 
			
		||||
    # This is to convert symbol names from what kraken
 | 
			
		||||
    # uses to the traditional 3x3 pair symbol syntax
 | 
			
		||||
    symlen = len(ticker)
 | 
			
		||||
    if symlen == 6:
 | 
			
		||||
        return ticker.lower()
 | 
			
		||||
    else:
 | 
			
		||||
        for sym in ['XXBT', 'XXMR', 'ZEUR']:
 | 
			
		||||
            if sym in ticker:
 | 
			
		||||
                ticker = ticker.replace(sym, sym[1:])
 | 
			
		||||
        return ticker.lower()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
 | 
			
		||||
    '''
 | 
			
		||||
    Create a request subscription packet dict.
 | 
			
		||||
 | 
			
		||||
    ## TODO: point to the auth urls
 | 
			
		||||
    https://docs.kraken.com/websockets/#message-subscribe
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # eg. specific logic for this in kraken's sync client:
 | 
			
		||||
    # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
			
		||||
    return {
 | 
			
		||||
        'event': 'subscribe',
 | 
			
		||||
        'subscription': data,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def handle_order_requests(
 | 
			
		||||
 | 
			
		||||
        client: Client,
 | 
			
		||||
        ems_order_stream: tractor.MsgStream,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    request_msg: dict
 | 
			
		||||
    order: BrokerdOrder
 | 
			
		||||
    userref_counter = count()
 | 
			
		||||
    async for request_msg in ems_order_stream:
 | 
			
		||||
        log.info(f'Received order request {request_msg}')
 | 
			
		||||
 | 
			
		||||
        action = request_msg['action']
 | 
			
		||||
 | 
			
		||||
        if action in {'buy', 'sell'}:
 | 
			
		||||
 | 
			
		||||
            account = request_msg['account']
 | 
			
		||||
            if account != 'kraken.spot':
 | 
			
		||||
                log.error(
 | 
			
		||||
                    'This is a kraken account, \
 | 
			
		||||
                    only a `kraken.spot` selection is valid'
 | 
			
		||||
                )
 | 
			
		||||
                await ems_order_stream.send(BrokerdError(
 | 
			
		||||
                    oid=request_msg['oid'],
 | 
			
		||||
                    symbol=request_msg['symbol'],
 | 
			
		||||
                    reason=f'Kraken only, No account found: `{account}` ?',
 | 
			
		||||
                ).dict())
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            # validate
 | 
			
		||||
            temp_id = next(userref_counter)
 | 
			
		||||
            order = BrokerdOrder(**request_msg)
 | 
			
		||||
            
 | 
			
		||||
            # call our client api to submit the order
 | 
			
		||||
            resp = await client.submit_limit(
 | 
			
		||||
                oid=order.oid,
 | 
			
		||||
                symbol=order.symbol,
 | 
			
		||||
                price=order.price,
 | 
			
		||||
                action=order.action,
 | 
			
		||||
                size=order.size,
 | 
			
		||||
                reqid=temp_id,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            err = resp['error']
 | 
			
		||||
            if err:
 | 
			
		||||
                log.error(f'Failed to submit order')
 | 
			
		||||
                await ems_order_stream.send(
 | 
			
		||||
                    BrokerdError(
 | 
			
		||||
                        oid=order.oid,
 | 
			
		||||
                        reqid=temp_id,
 | 
			
		||||
                        symbol=order.symbol,
 | 
			
		||||
                        reason="Failed order submission",
 | 
			
		||||
                        broker_details=resp
 | 
			
		||||
                    ).dict()
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                # TODO: handle multiple cancels
 | 
			
		||||
                #       txid is an array of strings
 | 
			
		||||
                reqid = resp['result']['txid'][0]
 | 
			
		||||
                # deliver ack that order has been submitted to broker routing
 | 
			
		||||
                await ems_order_stream.send(
 | 
			
		||||
                    BrokerdOrderAck(
 | 
			
		||||
 | 
			
		||||
                        # ems order request id
 | 
			
		||||
                        oid=order.oid,
 | 
			
		||||
 | 
			
		||||
                        # broker specific request id
 | 
			
		||||
                        reqid=reqid,
 | 
			
		||||
 | 
			
		||||
                        # account the made the order
 | 
			
		||||
                        account=order.account
 | 
			
		||||
 | 
			
		||||
                    ).dict()
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        elif action == 'cancel':
 | 
			
		||||
            msg = BrokerdCancel(**request_msg)
 | 
			
		||||
 | 
			
		||||
            # Send order cancellation to kraken
 | 
			
		||||
            resp = await client.submit_cancel(
 | 
			
		||||
                reqid=msg.reqid
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                # Check to make sure there was no error returned by
 | 
			
		||||
                # the kraken endpoint. Assert one order was cancelled
 | 
			
		||||
                assert resp['error'] == []
 | 
			
		||||
                assert resp['result']['count'] == 1
 | 
			
		||||
 | 
			
		||||
                # TODO: Change this code using .get
 | 
			
		||||
                try:
 | 
			
		||||
                    pending = resp['result']['pending']
 | 
			
		||||
                # Check to make sure the cancellation is NOT pending,
 | 
			
		||||
                # then send the confirmation to the ems order stream
 | 
			
		||||
                except KeyError:
 | 
			
		||||
                    await ems_order_stream.send(
 | 
			
		||||
                        BrokerdStatus(
 | 
			
		||||
                            reqid=msg.reqid,
 | 
			
		||||
                            account=msg.account,
 | 
			
		||||
                            time_ns=time.time_ns(),
 | 
			
		||||
                            status='cancelled',
 | 
			
		||||
                            reason='Order cancelled',
 | 
			
		||||
                            broker_details={'name': 'kraken'}
 | 
			
		||||
                        ).dict()
 | 
			
		||||
                    )
 | 
			
		||||
            except AssertionError:
 | 
			
		||||
                log.error(f'Order cancel was not successful')
 | 
			
		||||
                await ems_order_stream.send(
 | 
			
		||||
                    BrokerdError(
 | 
			
		||||
                        oid=order.oid,
 | 
			
		||||
                        reqid=temp_id,
 | 
			
		||||
                        symbol=order.symbol,
 | 
			
		||||
                        reason="Failed order cancel",
 | 
			
		||||
                        broker_details=resp
 | 
			
		||||
                    ).dict()
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            log.error(f'Unknown order command: {request_msg}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def trades_dialogue(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
) -> AsyncIterator[Dict[str, Any]]:
 | 
			
		||||
 | 
			
		||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
 | 
			
		||||
    @asynccontextmanager
 | 
			
		||||
    async def subscribe(ws: wsproto.WSConnection, token: str):
 | 
			
		||||
        # XXX: setup subs
 | 
			
		||||
        # https://docs.kraken.com/websockets/#message-subscribe
 | 
			
		||||
        # specific logic for this in kraken's shitty sync client:
 | 
			
		||||
        # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
			
		||||
        trades_sub = make_auth_sub(
 | 
			
		||||
            {'name': 'ownTrades', 'token': token}
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # TODO: we want to eventually allow unsubs which should
 | 
			
		||||
        # be completely fine to request from a separate task
 | 
			
		||||
        # since internally the ws methods appear to be FIFO
 | 
			
		||||
        # locked.
 | 
			
		||||
        await ws.send_msg(trades_sub)
 | 
			
		||||
 | 
			
		||||
        yield
 | 
			
		||||
 | 
			
		||||
        # unsub from all pairs on teardown
 | 
			
		||||
        await ws.send_msg({
 | 
			
		||||
            'event': 'unsubscribe',
 | 
			
		||||
            'subscription': ['ownTrades'],
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
        # XXX: do we need to ack the unsub?
 | 
			
		||||
        # await ws.recv_msg()
 | 
			
		||||
 | 
			
		||||
    # Authenticated block
 | 
			
		||||
    async with get_client() as client:
 | 
			
		||||
        if not client._api_key:
 | 
			
		||||
            log.error('Missing Kraken API key: Trades WS connection failed')
 | 
			
		||||
            await ctx.started(({}, {'paper',}))
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                ctx.open_stream() as ems_stream,
 | 
			
		||||
                trio.open_nursery() as n,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                client = PaperBoi(
 | 
			
		||||
                    'kraken',
 | 
			
		||||
                    ems_stream,
 | 
			
		||||
                    _buys={},
 | 
			
		||||
                    _sells={},
 | 
			
		||||
 | 
			
		||||
                    _reqids={},
 | 
			
		||||
 | 
			
		||||
                    # TODO: load paper positions from ``positions.toml``
 | 
			
		||||
                    _positions={},
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                ## TODO: maybe add multiple accounts
 | 
			
		||||
                n.start_soon(handle_order_requests, client, ems_stream)
 | 
			
		||||
 | 
			
		||||
        acc_name = 'kraken.' + client._name
 | 
			
		||||
        trades = await client.get_trades()
 | 
			
		||||
 | 
			
		||||
        position_msgs = pack_positions(acc_name, trades)
 | 
			
		||||
 | 
			
		||||
        await ctx.started((position_msgs, (acc_name,)))
 | 
			
		||||
 | 
			
		||||
        # Get websocket token for authenticated data stream
 | 
			
		||||
        # Assert that a token was actually received 
 | 
			
		||||
        resp = await client.endpoint('GetWebSocketsToken', {})
 | 
			
		||||
        assert resp['error'] == []
 | 
			
		||||
        token = resp['result']['token']
 | 
			
		||||
 | 
			
		||||
        async with (
 | 
			
		||||
            ctx.open_stream() as ems_stream,
 | 
			
		||||
            trio.open_nursery() as n,
 | 
			
		||||
        ):
 | 
			
		||||
            ## TODO: maybe add multiple accounts
 | 
			
		||||
            n.start_soon(handle_order_requests, client, ems_stream)
 | 
			
		||||
 | 
			
		||||
            # Process trades msg stream of ws
 | 
			
		||||
            async with open_autorecon_ws(
 | 
			
		||||
                'wss://ws-auth.kraken.com/',
 | 
			
		||||
                fixture=subscribe,
 | 
			
		||||
                token=token,
 | 
			
		||||
            ) as ws:
 | 
			
		||||
                async for msg in process_trade_msgs(ws):
 | 
			
		||||
                    for trade in msg:
 | 
			
		||||
                        # check the type of packaged message
 | 
			
		||||
                        assert type(trade) == Trade
 | 
			
		||||
                        # prepare and send a status update for line update
 | 
			
		||||
                        trade_msg = BrokerdStatus(
 | 
			
		||||
                            reqid=trade.reqid,
 | 
			
		||||
                            time_ns=time.time_ns(),
 | 
			
		||||
 | 
			
		||||
                            account='kraken.spot',
 | 
			
		||||
                            status='executed',
 | 
			
		||||
                            filled=float(trade.size),
 | 
			
		||||
                            reason='Order filled by kraken',
 | 
			
		||||
                            # remaining='' # TODO: not sure what to do here.
 | 
			
		||||
                            broker_details={
 | 
			
		||||
                                'name': 'kraken',
 | 
			
		||||
                                'broker_time': trade.broker_time
 | 
			
		||||
                            }
 | 
			
		||||
                        )
 | 
			
		||||
                        
 | 
			
		||||
                        await ems_stream.send(trade_msg.dict())
 | 
			
		||||
 | 
			
		||||
                        filled_msg = BrokerdStatus(
 | 
			
		||||
                            reqid=trade.reqid,
 | 
			
		||||
                            time_ns=time.time_ns(),
 | 
			
		||||
 | 
			
		||||
                            account='kraken.spot',
 | 
			
		||||
                            status='filled',
 | 
			
		||||
                            filled=float(trade.size),
 | 
			
		||||
                            reason='Order filled by kraken',
 | 
			
		||||
                            # remaining='' # TODO: not sure what to do here.
 | 
			
		||||
                            broker_details={
 | 
			
		||||
                                'name': 'kraken',
 | 
			
		||||
                                'broker_time': trade.broker_time
 | 
			
		||||
                            }
 | 
			
		||||
                        )
 | 
			
		||||
                        
 | 
			
		||||
                        await ems_stream.send(filled_msg.dict())
 | 
			
		||||
 | 
			
		||||
                        # send a fill msg for gui update
 | 
			
		||||
                        fill_msg = BrokerdFill(
 | 
			
		||||
                            reqid=trade.reqid,
 | 
			
		||||
                            time_ns=time.time_ns(),
 | 
			
		||||
 | 
			
		||||
                            action=trade.action,
 | 
			
		||||
                            size=float(trade.size),
 | 
			
		||||
                            price=float(trade.price),
 | 
			
		||||
                            # TODO: maybe capture more msg data i.e fees?
 | 
			
		||||
                            broker_details={'name': 'kraken'},
 | 
			
		||||
                            broker_time=float(trade.broker_time)
 | 
			
		||||
                        )
 | 
			
		||||
                        
 | 
			
		||||
                        await ems_stream.send(fill_msg.dict())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_messages(
 | 
			
		||||
    ws: NoBsWs,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Message stream parser and heartbeat handler.
 | 
			
		||||
 | 
			
		||||
    Deliver ws subscription messages as well as handle heartbeat logic
 | 
			
		||||
    though a single async generator.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    too_slow_count = last_hb = 0
 | 
			
		||||
 | 
			
		||||
    while True:
 | 
			
		||||
| 
						 | 
				
			
			@ -320,39 +844,95 @@ async def stream_messages(ws):
 | 
			
		|||
            if err:
 | 
			
		||||
                raise BrokerError(err)
 | 
			
		||||
        else:
 | 
			
		||||
            chan_id, *payload_array, chan_name, pair = msg
 | 
			
		||||
            yield msg
 | 
			
		||||
 | 
			
		||||
            if 'ohlc' in chan_name:
 | 
			
		||||
 | 
			
		||||
                yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
 | 
			
		||||
async def process_data_feed_msgs(
 | 
			
		||||
    ws: NoBsWs,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Parse and pack data feed messages.
 | 
			
		||||
 | 
			
		||||
            elif 'spread' in chan_name:
 | 
			
		||||
    '''
 | 
			
		||||
    async for msg in stream_messages(ws):
 | 
			
		||||
 | 
			
		||||
                bid, ask, ts, bsize, asize = map(float, payload_array[0])
 | 
			
		||||
        chan_id, *payload_array, chan_name, pair = msg
 | 
			
		||||
 | 
			
		||||
                # TODO: really makes you think IB has a horrible API...
 | 
			
		||||
                quote = {
 | 
			
		||||
                    'symbol': pair.replace('/', ''),
 | 
			
		||||
                    'ticks': [
 | 
			
		||||
                        {'type': 'bid', 'price': bid, 'size': bsize},
 | 
			
		||||
                        {'type': 'bsize', 'price': bid, 'size': bsize},
 | 
			
		||||
        if 'ohlc' in chan_name:
 | 
			
		||||
 | 
			
		||||
                        {'type': 'ask', 'price': ask, 'size': asize},
 | 
			
		||||
                        {'type': 'asize', 'price': ask, 'size': asize},
 | 
			
		||||
                    ],
 | 
			
		||||
                }
 | 
			
		||||
                yield 'l1', quote
 | 
			
		||||
            yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
 | 
			
		||||
 | 
			
		||||
            # elif 'book' in msg[-2]:
 | 
			
		||||
            #     chan_id, *payload_array, chan_name, pair = msg
 | 
			
		||||
            #     print(msg)
 | 
			
		||||
        elif 'spread' in chan_name:
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                print(f'UNHANDLED MSG: {msg}')
 | 
			
		||||
            bid, ask, ts, bsize, asize = map(float, payload_array[0])
 | 
			
		||||
 | 
			
		||||
            # TODO: really makes you think IB has a horrible API...
 | 
			
		||||
            quote = {
 | 
			
		||||
                'symbol': pair.replace('/', ''),
 | 
			
		||||
                'ticks': [
 | 
			
		||||
                    {'type': 'bid', 'price': bid, 'size': bsize},
 | 
			
		||||
                    {'type': 'bsize', 'price': bid, 'size': bsize},
 | 
			
		||||
 | 
			
		||||
                    {'type': 'ask', 'price': ask, 'size': asize},
 | 
			
		||||
                    {'type': 'asize', 'price': ask, 'size': asize},
 | 
			
		||||
                ],
 | 
			
		||||
            }
 | 
			
		||||
            yield 'l1', quote
 | 
			
		||||
 | 
			
		||||
        # elif 'book' in msg[-2]:
 | 
			
		||||
        #     chan_id, *payload_array, chan_name, pair = msg
 | 
			
		||||
        #     print(msg)
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            print(f'UNHANDLED MSG: {msg}')
 | 
			
		||||
            yield msg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def process_trade_msgs(
 | 
			
		||||
    ws: NoBsWs,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Parse and pack data feed messages.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    sequence_counter = 0
 | 
			
		||||
    async for msg in stream_messages(ws):
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # check that we are on the ownTrades stream and that msgs are 
 | 
			
		||||
            # arriving in sequence with kraken
 | 
			
		||||
            # For clarification the kraken ws api docs for this stream: 
 | 
			
		||||
            #   https://docs.kraken.com/websockets/#message-ownTrades
 | 
			
		||||
            assert msg[1] == 'ownTrades'
 | 
			
		||||
            assert msg[2]['sequence'] > sequence_counter
 | 
			
		||||
            sequence_counter += 1
 | 
			
		||||
            raw_msgs = msg[0]
 | 
			
		||||
            trade_msgs = []
 | 
			
		||||
 | 
			
		||||
            # Check that we are only processing new trades
 | 
			
		||||
            if msg[2]['sequence'] != 1:
 | 
			
		||||
                # check if its a new order or an update msg
 | 
			
		||||
                for trade_msg in raw_msgs:
 | 
			
		||||
                    trade = list(trade_msg.values())[0]
 | 
			
		||||
                    order_msg = Trade(
 | 
			
		||||
                        reqid=trade['ordertxid'],
 | 
			
		||||
                        action=trade['type'],
 | 
			
		||||
                        price=trade['price'],
 | 
			
		||||
                        size=trade['vol'],
 | 
			
		||||
                        broker_time=trade['time']
 | 
			
		||||
                    )
 | 
			
		||||
                    trade_msgs.append(order_msg)
 | 
			
		||||
 | 
			
		||||
            yield trade_msgs
 | 
			
		||||
 | 
			
		||||
        except AssertionError:
 | 
			
		||||
            print(f'UNHANDLED MSG: {msg}')
 | 
			
		||||
            yield msg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def normalize(
 | 
			
		||||
    ohlc: OHLC,
 | 
			
		||||
 | 
			
		||||
) -> dict:
 | 
			
		||||
    quote = asdict(ohlc)
 | 
			
		||||
    quote['broker_ts'] = quote['time']
 | 
			
		||||
| 
						 | 
				
			
			@ -371,11 +951,12 @@ def normalize(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
 | 
			
		||||
    """Create a request subscription packet dict.
 | 
			
		||||
    '''
 | 
			
		||||
    Create a request subscription packet dict.
 | 
			
		||||
 | 
			
		||||
    https://docs.kraken.com/websockets/#message-subscribe
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    '''
 | 
			
		||||
    # eg. specific logic for this in kraken's sync client:
 | 
			
		||||
    # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
 | 
			
		||||
    return {
 | 
			
		||||
| 
						 | 
				
			
			@ -393,8 +974,9 @@ async def backfill_bars(
 | 
			
		|||
    task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Fill historical bars into shared mem / storage afap.
 | 
			
		||||
    """
 | 
			
		||||
    '''
 | 
			
		||||
    Fill historical bars into shared mem / storage afap.
 | 
			
		||||
    '''
 | 
			
		||||
    with trio.CancelScope() as cs:
 | 
			
		||||
        async with open_cached_client('kraken') as client:
 | 
			
		||||
            bars = await client.bars(symbol=sym)
 | 
			
		||||
| 
						 | 
				
			
			@ -416,10 +998,12 @@ async def stream_quotes(
 | 
			
		|||
    task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Subscribe for ohlc stream of quotes for ``pairs``.
 | 
			
		||||
    '''
 | 
			
		||||
    Subscribe for ohlc stream of quotes for ``pairs``.
 | 
			
		||||
 | 
			
		||||
    ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -491,15 +1075,16 @@ async def stream_quotes(
 | 
			
		|||
            # XXX: do we need to ack the unsub?
 | 
			
		||||
            # await ws.recv_msg()
 | 
			
		||||
 | 
			
		||||
        # see the tips on reonnection logic:
 | 
			
		||||
        # see the tips on reconnection logic:
 | 
			
		||||
        # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
 | 
			
		||||
        ws: NoBsWs
 | 
			
		||||
        async with open_autorecon_ws(
 | 
			
		||||
            'wss://ws.kraken.com/',
 | 
			
		||||
            fixture=subscribe,
 | 
			
		||||
        ) as ws:
 | 
			
		||||
 | 
			
		||||
            # pull a first quote and deliver
 | 
			
		||||
            msg_gen = stream_messages(ws)
 | 
			
		||||
            msg_gen = process_data_feed_msgs(ws)
 | 
			
		||||
 | 
			
		||||
            # TODO: use ``anext()`` when it lands in 3.10!
 | 
			
		||||
            typ, ohlc_last = await msg_gen.__anext__()
 | 
			
		||||
| 
						 | 
				
			
			@ -558,6 +1143,7 @@ async def stream_quotes(
 | 
			
		|||
@tractor.context
 | 
			
		||||
async def open_symbol_search(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
) -> Client:
 | 
			
		||||
    async with open_cached_client('kraken') as client:
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -493,7 +493,8 @@ async def open_brokerd_trades_dialogue(
 | 
			
		|||
        finally:
 | 
			
		||||
            # parent context must have been closed
 | 
			
		||||
            # remove from cache so next client will respawn if needed
 | 
			
		||||
            _router.relays.pop(broker)
 | 
			
		||||
            ## TODO: Maybe add a warning
 | 
			
		||||
            _router.relays.pop(broker, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -389,7 +389,7 @@ async def handle_order_requests(
 | 
			
		|||
            account = request_msg['account']
 | 
			
		||||
            if account != 'paper':
 | 
			
		||||
                log.error(
 | 
			
		||||
                    'On a paper account, only a `paper` selection is valid'
 | 
			
		||||
                    'This is a paper account, only a `paper` selection is valid'
 | 
			
		||||
                )
 | 
			
		||||
                await ems_order_stream.send(BrokerdError(
 | 
			
		||||
                    oid=request_msg['oid'],
 | 
			
		||||
| 
						 | 
				
			
			@ -463,7 +463,8 @@ async def trades_dialogue(
 | 
			
		|||
    ):
 | 
			
		||||
        # TODO: load paper positions per broker from .toml config file
 | 
			
		||||
        # and pass as symbol to position data mapping: ``dict[str, dict]``
 | 
			
		||||
        await ctx.started(({}, ['paper']))
 | 
			
		||||
        # await ctx.started(all_positions)
 | 
			
		||||
        await ctx.started(({}, {'paper',}))
 | 
			
		||||
 | 
			
		||||
        async with (
 | 
			
		||||
            ctx.open_stream() as ems_stream,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -53,11 +53,13 @@ class NoBsWs:
 | 
			
		|||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        url: str,
 | 
			
		||||
        token: str,
 | 
			
		||||
        stack: AsyncExitStack,
 | 
			
		||||
        fixture: Callable,
 | 
			
		||||
        serializer: ModuleType = json,
 | 
			
		||||
    ):
 | 
			
		||||
        self.url = url
 | 
			
		||||
        self.token = token
 | 
			
		||||
        self.fixture = fixture
 | 
			
		||||
        self._stack = stack
 | 
			
		||||
        self._ws: 'WebSocketConnection' = None  # noqa
 | 
			
		||||
| 
						 | 
				
			
			@ -81,9 +83,15 @@ class NoBsWs:
 | 
			
		|||
                    trio_websocket.open_websocket_url(self.url)
 | 
			
		||||
                )
 | 
			
		||||
                # rerun user code fixture
 | 
			
		||||
                ret = await self._stack.enter_async_context(
 | 
			
		||||
                    self.fixture(self)
 | 
			
		||||
                )
 | 
			
		||||
                if self.token == '':
 | 
			
		||||
                    ret = await self._stack.enter_async_context(
 | 
			
		||||
                        self.fixture(self)
 | 
			
		||||
                    )
 | 
			
		||||
                else:
 | 
			
		||||
                    ret = await self._stack.enter_async_context(
 | 
			
		||||
                        self.fixture(self, self.token)
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                assert ret is None
 | 
			
		||||
 | 
			
		||||
                log.info(f'Connection success: {self.url}')
 | 
			
		||||
| 
						 | 
				
			
			@ -127,12 +135,14 @@ async def open_autorecon_ws(
 | 
			
		|||
 | 
			
		||||
    # TODO: proper type annot smh
 | 
			
		||||
    fixture: Callable,
 | 
			
		||||
    # used for authenticated websockets
 | 
			
		||||
    token: str = '',
 | 
			
		||||
) -> AsyncGenerator[tuple[...],  NoBsWs]:
 | 
			
		||||
    """Apparently we can QoS for all sorts of reasons..so catch em.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    async with AsyncExitStack() as stack:
 | 
			
		||||
        ws = NoBsWs(url, stack, fixture=fixture)
 | 
			
		||||
        ws = NoBsWs(url, token, stack, fixture=fixture)
 | 
			
		||||
        await ws._connect()
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue