refactory based on github comments, change doc string style

kraken_orders
Konstantine Tsafatinos 2022-03-06 15:16:42 -05:00
parent 617bf3e0da
commit fd0acd21fb
1 changed files with 91 additions and 59 deletions

View File

@ -14,13 +14,13 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
Kraken backend. Kraken backend.
""" '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator from typing import Dict, List, Tuple, Any, Optional, AsyncIterator
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -34,6 +34,10 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
from itertools import count from itertools import count
import urllib.parse
import hashlib
import hmac
import base64
from .. import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
@ -47,11 +51,6 @@ from ..clearing._messages import (
BrokerdFill, BrokerdFill,
) )
import urllib.parse
import hashlib
import hmac
import base64
log = get_logger(__name__) log = get_logger(__name__)
@ -120,7 +119,10 @@ class Pair(BaseModel):
class Trade(BaseModel): class Trade(BaseModel):
"""Trade class that helps parse and validate ownTrades stream""" '''
Trade class that helps parse and validate ownTrades stream
'''
reqid: str # kraken order transaction id reqid: str # kraken order transaction id
action: str # buy or sell action: str # buy or sell
price: str # price of asset price: str # price of asset
@ -130,11 +132,13 @@ class Trade(BaseModel):
@dataclass @dataclass
class OHLC: class OHLC:
"""Description of the flattened OHLC quote format. '''
Description of the flattened OHLC quote format.
For schema details see: For schema details see:
https://docs.kraken.com/websockets/#message-ohlc https://docs.kraken.com/websockets/#message-ohlc
"""
'''
chan_id: int # internal kraken id chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval) chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair pair: str # fx pair
@ -179,16 +183,24 @@ def get_kraken_signature(
class InvalidKey(ValueError): class InvalidKey(ValueError):
"""EAPI:Invalid key '''
EAPI:Invalid key
This error is returned when the API key used for the call is This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one Settings -> API tab of account management or generate a new one
and update your application.""" and update your application.
'''
class Client: class Client:
def __init__(self) -> None: def __init__(
self,
name: str = '',
api_key: str = '',
secret: str = ''
) -> None:
self._sesh = asks.Session(connections=4) self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url self._sesh.base_location = _url
self._sesh.headers.update({ self._sesh.headers.update({
@ -196,9 +208,9 @@ class Client:
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}) })
self._pairs: list[str] = [] self._pairs: list[str] = []
self._name = '' self._name = name
self._api_key = '' self._api_key = api_key
self._secret = '' self._secret = secret
@property @property
def pairs(self) -> Dict[str, Any]: def pairs(self) -> Dict[str, Any]:
@ -244,26 +256,26 @@ class Client:
) )
return resproc(resp, log) return resproc(resp, log)
async def kraken_endpoint( async def endpoint(
self, self,
method: str, method: str,
data: Dict[str, Any] data: Dict[str, Any]
) -> Dict[str, Any]: ) -> Dict[str, Any]:
uri_path = f'/0/private/{method}' uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time())) data['nonce'] = str(int(1000*time.time()))
resp = await self._private(method, data, uri_path) return await self._private(method, data, uri_path)
return resp
async def get_positions( async def get_positions(
self, self,
data: Dict[str, Any] = {} data: Dict[str, Any] = {}
) -> Dict[str, Any]: ) -> (Dict[str, Any], Dict[str, Any]):
data['ofs'] = 0 data['ofs'] = 0
positions = {} positions = {}
vols = {} vols = {}
# Grab all trade history # Grab all trade history
# https://docs.kraken.com/rest/#operation/getTradeHistory
while True: while True:
resp = await self.kraken_endpoint('TradesHistory', data) resp = await self.endpoint('TradesHistory', data)
# grab the first 50 trades # grab the first 50 trades
if data['ofs'] == 0: if data['ofs'] == 0:
trades = resp['result']['trades'] trades = resp['result']['trades']
@ -281,22 +293,28 @@ class Client:
# increment the offset counter # increment the offset counter
data['ofs'] += 50 data['ofs'] += 50
# To avoid exceeding API rate limit in case of a lot of trades # To avoid exceeding API rate limit in case of a lot of trades
time.sleep(1) await trio.sleep(1)
# make sure you grabbed all the trades # make sure you grabbed all the trades
assert count == len(trades.values()) assert count == len(trades.values())
# positions # positions
## TODO: Make sure to add option to include fees in positions calc # TODO: Make sure to add option to include fees in positions calc
for trade in trades.values(): for trade in trades.values():
sign = -1 if trade['type'] == 'sell' else 1 sign = -1 if trade['type'] == 'sell' else 1
# This catch is for populating the dict with new values
# as the plus assigment will fail if there no value
# tied to the key
try: try:
positions[trade['pair']] += sign * float(trade['cost']) positions[trade['pair']] += sign * float(trade['cost'])
vols[trade['pair']] += sign * float(trade['vol']) vols[trade['pair']] += sign * float(trade['vol'])
except KeyError: except KeyError:
positions[trade['pair']] = sign * float(trade['cost']) positions[trade['pair']] = sign * float(trade['cost'])
vols[trade['pair']] = sign * float(trade['vol']) vols[trade['pair']] = sign * float(trade['vol'])
# This cycles through the summed trades of an asset and then
# normalizes the price with the current volume of the asset
# you are holding. If you have no more of the asset, the balance
# is 0, then it sets the position to 0.
for pair in positions.keys(): for pair in positions.keys():
asset_balance = vols[pair] asset_balance = vols[pair]
if asset_balance == 0: if asset_balance == 0:
@ -316,9 +334,10 @@ class Client:
# account: str, # account: str,
reqid: int = None, reqid: int = None,
) -> int: ) -> int:
"""Place an order and return integer request id provided by client. '''
Place an order and return integer request id provided by client.
""" '''
# Build order data for kraken api # Build order data for kraken api
data = { data = {
"userref": reqid, "userref": reqid,
@ -330,20 +349,18 @@ class Client:
# set to True test AddOrder call without a real submission # set to True test AddOrder call without a real submission
"validate": False "validate": False
} }
resp = await self.kraken_endpoint('AddOrder', data) return await self.endpoint('AddOrder', data)
return resp
async def submit_cancel( async def submit_cancel(
self, self,
reqid: str, reqid: str,
) -> None: ) -> None:
"""Send cancel request for order id ``reqid``. '''
Send cancel request for order id ``reqid``.
""" '''
# txid is a transaction id given by kraken # txid is a transaction id given by kraken
data = {"txid": reqid} return await self.endpoint('CancelOrder', {"txid": reqid})
resp = await self.kraken_endpoint('CancelOrder', data)
return resp
async def symbol_info( async def symbol_info(
self, self,
@ -457,12 +474,13 @@ class Client:
@asynccontextmanager @asynccontextmanager
async def get_client() -> Client: async def get_client() -> Client:
client = Client()
section = get_config() section = get_config()
client._name = section['key_descr'] client = Client(
client._api_key = section['api_key'] name=section['key_descr'],
client._secret = section['secret'] api_key=section['api_key'],
secret=section['secret']
)
# at startup, load all symbols locally for fast search # at startup, load all symbols locally for fast search
await client.cache_symbols() await client.cache_symbols()
@ -490,6 +508,8 @@ def pack_position(
def normalize_symbol( def normalize_symbol(
ticker: str ticker: str
) -> str: ) -> str:
# This is to convert symbol names from what kraken
# uses to the traditional 3x3 pair symbol syntax
symlen = len(ticker) symlen = len(ticker)
if symlen == 6: if symlen == 6:
return ticker.lower() return ticker.lower()
@ -501,12 +521,13 @@ def normalize_symbol(
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict. '''
Create a request subscription packet dict.
## TODO: point to the auth urls ## TODO: point to the auth urls
https://docs.kraken.com/websockets/#message-subscribe https://docs.kraken.com/websockets/#message-subscribe
""" '''
# eg. specific logic for this in kraken's sync client: # eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return { return {
@ -519,9 +540,6 @@ async def handle_order_requests(
client: Client, client: Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
#ws: NoBsWs,
#token: str,
#userref_oid_map: dict,
) -> None: ) -> None:
@ -575,8 +593,8 @@ async def handle_order_requests(
).dict() ).dict()
) )
else: else:
## TODO: handle multiple cancels # TODO: handle multiple cancels
## txid is an array of strings # txid is an array of strings
reqid = resp['result']['txid'][0] reqid = resp['result']['txid'][0]
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -608,7 +626,7 @@ async def handle_order_requests(
assert resp['error'] == [] assert resp['error'] == []
assert resp['result']['count'] == 1 assert resp['result']['count'] == 1
## TODO: Change this code using .get # TODO: Change this code using .get
try: try:
pending = resp['result']['pending'] pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending, # Check to make sure the cancellation is NOT pending,
@ -626,6 +644,15 @@ async def handle_order_requests(
) )
except AssertionError: except AssertionError:
log.error(f'Order cancel was not successful') log.error(f'Order cancel was not successful')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=temp_id,
symbol=order.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
else: else:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')
@ -669,7 +696,6 @@ async def trades_dialogue(
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
# Authenticated block # Authenticated block
async with get_client() as client: async with get_client() as client:
acc_name = 'kraken.' + client._name acc_name = 'kraken.' + client._name
@ -687,7 +713,7 @@ async def trades_dialogue(
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received # Assert that a token was actually received
resp = await client.kraken_endpoint('GetWebSocketsToken', {}) resp = await client.endpoint('GetWebSocketsToken', {})
assert resp['error'] == [] assert resp['error'] == []
token = resp['result']['token'] token = resp['result']['token']
@ -717,7 +743,7 @@ async def trades_dialogue(
status='executed', status='executed',
filled=float(trade.size), filled=float(trade.size),
reason='Order filled by kraken', reason='Order filled by kraken',
# remaining='' ## TODO: not sure what to do here. # remaining='' # TODO: not sure what to do here.
broker_details={ broker_details={
'name': 'kraken', 'name': 'kraken',
'broker_time': trade.broker_time 'broker_time': trade.broker_time
@ -734,7 +760,7 @@ async def trades_dialogue(
action=trade.action, action=trade.action,
size=float(trade.size), size=float(trade.size),
price=float(trade.price), price=float(trade.price),
## TODO: maybe capture more msg data i.e fees? # TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time) broker_time=float(trade.broker_time)
) )
@ -846,6 +872,8 @@ async def process_trade_msgs(
try: try:
# check that we are on the ownTrades stream and that msgs are # check that we are on the ownTrades stream and that msgs are
# arriving in sequence with kraken # arriving in sequence with kraken
# For clarification the kraken ws api docs for this stream:
# https://docs.kraken.com/websockets/#message-ownTrades
assert msg[1] == 'ownTrades' assert msg[1] == 'ownTrades'
assert msg[2]['sequence'] > sequence_counter assert msg[2]['sequence'] > sequence_counter
sequence_counter += 1 sequence_counter += 1
@ -894,11 +922,12 @@ def normalize(
def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict. '''
Create a request subscription packet dict.
https://docs.kraken.com/websockets/#message-subscribe https://docs.kraken.com/websockets/#message-subscribe
""" '''
# eg. specific logic for this in kraken's sync client: # eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return { return {
@ -916,8 +945,9 @@ async def backfill_bars(
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. '''
""" Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym) bars = await client.bars(symbol=sym)
@ -939,10 +969,12 @@ async def stream_quotes(
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``. '''
Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>. ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
"""
'''
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)