Compare commits

...

17 Commits

Author SHA1 Message Date
Tyler Goodlet 30fdf550d7 Factor out ws msg hearbeat and error handling
Move the core ws message handling into `stream_messages()` and call that
from 2 new stream processors: `process_data_feed_msgs()` and
`process_order_msgs()`. Add comments for hints on how to implement the
order msg parsing as well as `pprint` received msgs to console for now.
2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 37df05c260 connect to krakens openOrders websocket 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos d141981cca order submission and cancellation working 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos ead9fbd2f1 basic order submission and cancelling with kraken 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos bcdcfc7d59 valdiate and ack order requests from ems 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 3af3b693da mock orders validated from kraken 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 7627fb5141 get basic order request loop receiving msgs 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos a3a6718ba5 added the bones for the handle_order_requests func 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 90947438d2 save progress on kraken to test out unit_select_fixes 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 5b01c84afd get positions working for kraken 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 20a5ffdc2b get positions from trades 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 0bab95eaa6 Store changes for rebase, positions prototype 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 549c0d2c85 Add balance to the ledger 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 178d146deb Add get_ledger function; parses raw ledger from kraken api 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 3507e492e8 wrap api method calls with uri and nonce value 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 89fe9c0713 get kraken authentication and retrieve balances 2022-02-17 08:21:00 -05:00
Tyler Goodlet 8f15d1fd1f Misc curve doc strings 2022-02-17 08:21:00 -05:00
6 changed files with 526 additions and 38 deletions

View File

@ -8,8 +8,8 @@ expires_at = 1616095326.355846
[kraken]
key_descr = "api_0"
public_key = ""
private_key = ""
api_key = ""
secret = ""
[ib]
host = "127.0.0.1"

View File

@ -20,7 +20,7 @@ Kraken backend.
"""
from contextlib import asynccontextmanager
from dataclasses import asdict, field
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import time
from trio_typing import TaskStatus
@ -33,12 +33,25 @@ import tractor
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from itertools import count
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel,
BrokerdFill,
)
import urllib.parse
import hashlib
import hmac
import base64
log = get_logger(__name__)
@ -129,6 +142,41 @@ class OHLC:
ticks: List[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature(
urlpath: str,
data: Dict[str, Any],
secret: str
) -> str:
postdata = urllib.parse.urlencode(data)
encoded = (str(data['nonce']) + postdata).encode()
message = urlpath.encode() + hashlib.sha256(encoded).digest()
mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512)
sigdigest = base64.b64encode(mac.digest())
return sigdigest.decode()
class InvalidKey(ValueError):
"""EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application."""
class Client:
def __init__(self) -> None:
@ -139,6 +187,9 @@ class Client:
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._pairs: list[str] = []
self._name = ''
self._api_key = ''
self._secret = ''
@property
def pairs(self) -> Dict[str, Any]:
@ -162,6 +213,109 @@ class Client:
)
return resproc(resp, log)
async def _private(
self,
method: str,
data: dict,
uri_path: str
) -> Dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
}
resp = await self._sesh.post(
path=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
async def kraken_endpoint(
self,
method: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time()))
resp = await self._private(method, data, uri_path)
return resp
async def get_positions(
self,
data: Dict[str, Any] = {}
) -> Dict[str, Any]:
resp = await self.kraken_endpoint('Balance', data)
balances = resp['result']
## TODO: grab all entries, not just first 50
resp = await self.kraken_endpoint('TradesHistory', data)
traders = resp['result']
positions = {}
vols = {}
# positions
## TODO: Make sure to add option to include fees in positions calc
for trade in traders['trades'].values():
sign = -1 if trade['type'] == 'sell' else 1
try:
positions[trade['pair']] += sign * float(trade['cost'])
vols[trade['pair']] += sign * float(trade['vol'])
except KeyError:
positions[trade['pair']] = sign * float(trade['cost'])
vols[trade['pair']] = sign * float(trade['vol'])
for pair in positions.keys():
asset_balance = vols[pair]
if asset_balance == 0:
positions[pair] = 0
else:
positions[pair] /= asset_balance
return positions, vols
async def submit_limit(
self,
oid: str,
symbol: str,
price: float,
action: str,
size: float,
# account: str,
reqid: int = None,
) -> int:
"""Place an order and return integer request id provided by client.
"""
# Build order data for kraken api
data = {
"userref": reqid,
"ordertype": "limit",
"type": action,
"volume": str(size),
"pair": symbol,
"price": str(price),
# set to True test AddOrder call without a real submission
"validate": False
}
resp = await self.kraken_endpoint('AddOrder', data)
return resp
async def submit_cancel(
self,
reqid: str,
) -> None:
"""Send cancel request for order id ``reqid``.
"""
# txid is a transaction id given by kraken
data = {"txid": reqid}
resp = await self.kraken_endpoint('CancelOrder', data)
return resp
async def symbol_info(
self,
pair: Optional[str] = None,
@ -275,14 +429,284 @@ class Client:
async def get_client() -> Client:
client = Client()
## TODO: maybe add conditional based on section
section = get_config()
client._name = section['key_descr']
client._api_key = section['api_key']
client._secret = section['secret']
## TODO: Add a client attribute to hold this info
#data = {
# # add non-nonce and non-ofs vars
#}
# at startup, load all symbols locally for fast search
await client.cache_symbols()
yield client
async def stream_messages(ws):
def pack_position(
acc: str,
symkey: str,
pos: float,
vol: float
) -> dict[str, Any]:
return BrokerdPosition(
broker='kraken',
account=acc,
symbol=symkey,
currency=symkey[-3:],
size=float(vol),
avg_price=float(pos),
)
def normalize_symbol(
ticker: str
) -> str:
symlen = len(ticker)
if symlen == 6:
return ticker.lower()
else:
for sym in ['XXBT', 'XXMR', 'ZEUR']:
if sym in ticker:
ticker = ticker.replace(sym, sym[1:])
return ticker.lower()
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict.
## TODO: point to the auth urls
https://docs.kraken.com/websockets/#message-subscribe
"""
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'event': 'subscribe',
'subscription': data,
}
async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
) -> None:
request_msg: dict
order: BrokerdOrder
userref_counter = count()
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Kraken only, No account found: `{account}` ?',
).dict())
continue
# validate
temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=temp_id,
)
err = resp['error']
if err:
log.error(f'Failed to submit order')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=temp_id,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
## TODO: handle multiple cancels
## txid is an array of strings
reqid = resp['result']['txid'][0]
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
# account the made the order
account=order.account
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
try:
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled
assert resp['error'] == []
assert resp['result']['count'] == 1
## TODO: Change this code using .get
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
except AssertionError:
log.error(f'Order cancel was not successful')
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# Generate
@asynccontextmanager
async def subscribe(ws: wsproto.WSConnection, token: str):
## TODO: Fix docs and points to right urls
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
trades_sub = make_auth_sub(
{'name': 'openOrders', 'token': token}
)
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(trades_sub)
## trade data (aka L1)
#l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#)
## pull a first quote and deliver
#await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': ['openOrders'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# Authenticated block
async with get_client() as client:
acc_name = 'kraken.' + client._name
positions, vols = await client.get_positions()
all_positions = []
for ticker, pos in positions.items():
norm_sym = normalize_symbol(ticker)
if float(vols[ticker]) != 0:
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict())
open_orders = await client.kraken_endpoint('OpenOrders', {})
#await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,)))
#await trio.sleep_forever()
# Get websocket token for authenticated data stream
# Assert that a token was actually received
resp = await client.kraken_endpoint('GetWebSocketsToken', {})
assert resp['error'] == []
token = resp['result']['token']
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
from pprint import pprint
async for msg in process_order_msgs(ws):
pprint(msg)
async def stream_messages(
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0
while True:
@ -320,39 +744,76 @@ async def stream_messages(ws):
if err:
raise BrokerError(err)
else:
chan_id, *payload_array, chan_name, pair = msg
yield msg
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
elif 'spread' in chan_name:
'''
async for msg in stream_messages(ws):
bid, ask, ts, bsize, asize = map(float, payload_array[0])
chan_id, *payload_array, chan_name, pair = msg
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
if 'ohlc' in chan_name:
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
elif 'spread' in chan_name:
else:
print(f'UNHANDLED MSG: {msg}')
bid, ask, ts, bsize, asize = map(float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
async def process_order_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
# TODO: write your order event parser here!
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
# '0.00000000', 'fee': '0.00000000', 'avg_price':
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
# requested'}}], 'openOrders', {'sequence': 4}]
yield msg
def normalize(
ohlc: OHLC,
) -> dict:
quote = asdict(ohlc)
quote['broker_ts'] = quote['time']
@ -492,15 +953,16 @@ async def stream_quotes(
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reonnection logic:
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws:
# pull a first quote and deliver
msg_gen = stream_messages(ws)
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()
@ -559,6 +1021,7 @@ async def stream_quotes(
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('kraken') as client:

View File

@ -493,7 +493,8 @@ async def open_brokerd_trades_dialogue(
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
## TODO: Maybe add a warning
_router.relays.pop(broker, None)
@tractor.context

View File

@ -389,7 +389,7 @@ async def handle_order_requests(
account = request_msg['account']
if account != 'paper':
log.error(
'On a paper account, only a `paper` selection is valid'
'This is a paper account, only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
@ -463,7 +463,8 @@ async def trades_dialogue(
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started(({}, ['paper']))
# await ctx.started(all_positions)
await ctx.started(({}, {'paper',}))
async with (
ctx.open_stream() as ems_stream,

View File

@ -53,11 +53,13 @@ class NoBsWs:
def __init__(
self,
url: str,
token: str,
stack: AsyncExitStack,
fixture: Callable,
serializer: ModuleType = json,
):
self.url = url
self.token = token
self.fixture = fixture
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
@ -81,9 +83,15 @@ class NoBsWs:
trio_websocket.open_websocket_url(self.url)
)
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
if self.token == '':
ret = await self._stack.enter_async_context(
self.fixture(self)
)
else:
ret = await self._stack.enter_async_context(
self.fixture(self, self.token)
)
assert ret is None
log.info(f'Connection success: {self.url}')
@ -127,12 +135,14 @@ async def open_autorecon_ws(
# TODO: proper type annot smh
fixture: Callable,
# used for authenticated websockets
token: str = '',
) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = NoBsWs(url, stack, fixture=fixture)
ws = NoBsWs(url, token, stack, fixture=fixture)
await ws._connect()
try:

View File

@ -108,7 +108,6 @@ class FastAppendCurve(pg.PlotCurveItem):
path redraw.
'''
def __init__(
self,
*args,
@ -167,7 +166,13 @@ class FastAppendCurve(pg.PlotCurveItem):
y: np.ndarray,
) -> QtGui.QPainterPath:
'''
Update curve from input 2-d data.
Compare with a cached "x-range" state and (pre/a)ppend based on
a length diff.
'''
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
flip_cache = False
@ -316,12 +321,19 @@ class FastAppendCurve(pg.PlotCurveItem):
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
def disable_cache(self) -> None:
'''
Disable the use of the pixel coordinate cache and trigger a geo event.
'''
# XXX: pretty annoying but, without this there's little
# artefacts on the append updates to the curve...
self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
self.prepareGeometryChange()
def boundingRect(self):
'''
Compute and then cache our rect.
'''
if self.path is None:
return QtGui.QPainterPath().boundingRect()
else:
@ -331,9 +343,10 @@ class FastAppendCurve(pg.PlotCurveItem):
return self._br()
def _br(self):
"""Post init ``.boundingRect()```.
'''
Post init ``.boundingRect()```.
"""
'''
hb = self.path.controlPointRect()
hb_size = hb.size()
# print(f'hb_size: {hb_size}')