From b118becc84d1096d9ce1932c75b99cdcbbe80c9c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 10:11:11 -0400 Subject: [PATCH 01/18] Start `kraken` sub-pkg --- piker/brokers/{kraken.py => kraken/__init__.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename piker/brokers/{kraken.py => kraken/__init__.py} (100%) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken/__init__.py similarity index 100% rename from piker/brokers/kraken.py rename to piker/brokers/kraken/__init__.py From 90cc6eb31730f4abba44500a4f4ee9395dd47669 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 11:08:48 -0400 Subject: [PATCH 02/18] Factor clearing related endpoints into new `.kraken.broker` submod --- piker/brokers/kraken/__init__.py | 430 +---------------------------- piker/brokers/kraken/broker.py | 457 +++++++++++++++++++++++++++++++ 2 files changed, 465 insertions(+), 422 deletions(-) create mode 100644 piker/brokers/kraken/broker.py diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 670eed6f..56c6ec85 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -21,8 +21,7 @@ Kraken backend. from contextlib import asynccontextmanager as acm from dataclasses import asdict, field from datetime import datetime -from pprint import pformat -from typing import Any, Optional, AsyncIterator, Callable, Union +from typing import Any, Optional, Callable, Union import time from trio_typing import TaskStatus @@ -40,29 +39,21 @@ import hashlib import hmac import base64 -from .. import config -from .._cacheables import open_cached_client -from ._util import ( +from piker import config +from piker._cacheables import open_cached_client +from piker.brokers._util import ( resproc, SymbolNotFound, BrokerError, DataThrottle, DataUnavailable, ) -from ..log import get_logger, get_console_log -from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws, NoBsWs -from ..clearing._paper_engine import PaperBoi -from ..clearing._messages import ( - BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, - BrokerdFill, -) - +from piker.log import get_logger, get_console_log +from piker.data import ShmArray +from piker.data._web_bs import open_autorecon_ws, NoBsWs log = get_logger(__name__) - # // _url = 'https://api.kraken.com/0' @@ -126,18 +117,6 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair -class Trade(BaseModel): - ''' - Trade class that helps parse and validate ownTrades stream - - ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: str # price of asset - size: str # vol of asset - broker_time: str # e.g GTC, GTD - - @dataclass class OHLC: ''' @@ -493,41 +472,6 @@ async def get_client() -> Client: yield client -def pack_positions( - acc: str, - trades: dict -) -> list[Any]: - positions: dict[str, float] = {} - vols: dict[str, float] = {} - costs: dict[str, float] = {} - position_msgs: list[Any] = [] - - for trade in trades.values(): - sign = -1 if trade['type'] == 'sell' else 1 - pair = trade['pair'] - vol = float(trade['vol']) - vols[pair] = vols.get(pair, 0) + sign * vol - costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) - positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 - - for ticker, pos in positions.items(): - vol = float(vols[ticker]) - if not vol: - continue - norm_sym = normalize_symbol(ticker) - msg = BrokerdPosition( - broker='kraken', - account=acc, - symbol=norm_sym, - currency=norm_sym[-3:], - size=vol, - avg_price=float(pos), - ) - position_msgs.append(msg.dict()) - - return position_msgs - - def normalize_symbol( ticker: str ) -> str: @@ -543,322 +487,6 @@ def normalize_symbol( return ticker.lower() -def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - ## TODO: point to the auth urls - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'event': 'subscribe', - 'subscription': data, - } - - -async def handle_order_requests( - - client: Client, - ems_order_stream: tractor.MsgStream, - -) -> None: - - request_msg: dict - order: BrokerdOrder - - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) - - action = request_msg['action'] - - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - - # reason=f'Kraken only, No account found: `{account}` ?', - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - - ).dict()) - continue - - # validate - order = BrokerdOrder(**request_msg) - # call our client api to submit the order - resp = await client.submit_limit( - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - reqid=order.reqid, - ) - - err = resp['error'] - if err: - oid = order.oid - log.error(f'Failed to submit order: {oid}') - - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=order.reqid, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ).dict() - ) - else: - # TODO: handle multiple orders (cancels?) - # txid is an array of strings - if order.reqid is None: - reqid = resp['result']['txid'][0] - else: - # update the internal pairing of oid to krakens - # txid with the new txid that is returned on edit - reqid = resp['result']['txid'] - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - # account the made the order - account=order.account - - ).dict() - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) - - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled. - try: - result = resp['result'] - count = result['count'] - - # check for 'error' key if we received no 'result' - except KeyError: - error = resp.get('error') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason="Failed order cancel", - broker_details=resp - ).dict() - ) - - if not error: - raise BrokerError(f'Unknown order cancel response: {resp}') - - else: - if not count: # no orders were cancelled? - - # XXX: what exactly is this from and why would we care? - # there doesn't seem to be any docs here? - # https://docs.kraken.com/rest/#operation/cancelOrder - - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - pending = result.get('pending') - if pending: - log.error(f'Order {oid} cancel was not yet successful') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - # TODO: maybe figure out if pending - # cancels will eventually get cancelled - reason="Order cancel is still pending?", - broker_details=resp - ).dict() - ) - - else: # order cancel success case. - - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - else: - log.error(f'Unknown order command: {request_msg}') - - -@tractor.context -async def trades_dialogue( - ctx: tractor.Context, - loglevel: str = None, -) -> AsyncIterator[dict[str, Any]]: - - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - @acm - async def subscribe(ws: wsproto.WSConnection, token: str): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - trades_sub = make_auth_sub( - {'name': 'ownTrades', 'token': token} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(trades_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'event': 'unsubscribe', - 'subscription': ['ownTrades'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - # Authenticated block - async with get_client() as client: - if not client._api_key: - log.error('Missing Kraken API key: Trades WS connection failed') - await ctx.started(({}, ['paper'])) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - - client = PaperBoi( - 'kraken', - ems_stream, - _buys={}, - _sells={}, - - _reqids={}, - - # TODO: load paper positions from ``positions.toml`` - _positions={}, - ) - - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - acc_name = 'kraken.' + client._name - trades = await client.get_trades() - - position_msgs = pack_positions(acc_name, trades) - - await ctx.started((position_msgs, (acc_name,))) - - # Get websocket token for authenticated data stream - # Assert that a token was actually received. - resp = await client.endpoint('GetWebSocketsToken', {}) - - # lol wtf is this.. - assert resp['error'] == [] - - token = resp['result']['token'] - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - # Process trades msg stream of ws - async with open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade - - # prepare and send a filled status update - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), - - account='kraken.spot', - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, - - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - - await ems_stream.send(filled_msg.dict()) - - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), - - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) - - await ems_stream.send(fill_msg.dict()) - - async def stream_messages( ws: NoBsWs, ): @@ -950,48 +578,6 @@ async def process_data_feed_msgs( yield msg -async def process_trade_msgs( - ws: NoBsWs, -): - ''' - Parse and pack data feed messages. - - ''' - sequence_counter = 0 - async for msg in stream_messages(ws): - - try: - # check that we are on the ownTrades stream and that msgs - # are arriving in sequence with kraken For clarification the - # kraken ws api docs for this stream: - # https://docs.kraken.com/websockets/#message-ownTrades - assert msg[1] == 'ownTrades' - assert msg[2]['sequence'] > sequence_counter - sequence_counter += 1 - raw_msgs = msg[0] - trade_msgs = [] - - # Check that we are only processing new trades - if msg[2]['sequence'] != 1: - # check if its a new order or an update msg - for trade_msg in raw_msgs: - trade = list(trade_msg.values())[0] - order_msg = Trade( - reqid=trade['ordertxid'], - action=trade['type'], - price=trade['price'], - size=trade['vol'], - broker_time=trade['time'] - ) - trade_msgs.append(order_msg) - - yield trade_msgs - - except AssertionError: - print(f'UNHANDLED MSG: {msg}') - yield msg - - def normalize( ohlc: OHLC, diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py new file mode 100644 index 00000000..4d1f0338 --- /dev/null +++ b/piker/brokers/kraken/broker.py @@ -0,0 +1,457 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Order api and machinery + +''' +from contextlib import asynccontextmanager as acm +from pprint import pformat +import time +from typing import ( + Any, + AsyncIterator, + # Callable, + # Optional, + # Union, +) + +from pydantic import BaseModel +import trio +import tractor +import wsproto + +from . import ( + Client, + BrokerError, + get_client, + get_console_log, + log, + normalize_symbol, + open_autorecon_ws, + NoBsWs, + stream_messages, +) +from ..clearing._paper_engine import PaperBoi +from ..clearing._messages import ( + BrokerdPosition, BrokerdOrder, BrokerdStatus, + BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdFill, +) + + +class Trade(BaseModel): + ''' + Trade class that helps parse and validate ownTrades stream + + ''' + reqid: str # kraken order transaction id + action: str # buy or sell + price: str # price of asset + size: str # vol of asset + broker_time: str # e.g GTC, GTD + + +def pack_positions( + acc: str, + trades: dict +) -> list[Any]: + positions: dict[str, float] = {} + vols: dict[str, float] = {} + costs: dict[str, float] = {} + position_msgs: list[Any] = [] + + for trade in trades.values(): + sign = -1 if trade['type'] == 'sell' else 1 + pair = trade['pair'] + vol = float(trade['vol']) + vols[pair] = vols.get(pair, 0) + sign * vol + costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) + positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 + + for ticker, pos in positions.items(): + vol = float(vols[ticker]) + if not vol: + continue + norm_sym = normalize_symbol(ticker) + msg = BrokerdPosition( + broker='kraken', + account=acc, + symbol=norm_sym, + currency=norm_sym[-3:], + size=vol, + avg_price=float(pos), + ) + position_msgs.append(msg.dict()) + + return position_msgs + + +async def handle_order_requests( + + client: Client, + ems_order_stream: tractor.MsgStream, + +) -> None: + + request_msg: dict + order: BrokerdOrder + + async for request_msg in ems_order_stream: + log.info( + 'Received order request:\n' + f'{pformat(request_msg)}' + ) + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + + # reason=f'Kraken only, No account found: `{account}` ?', + reason=( + 'Kraken only, order mode disabled due to ' + 'https://github.com/pikers/piker/issues/299' + ), + + ).dict()) + continue + + # validate + order = BrokerdOrder(**request_msg) + # call our client api to submit the order + resp = await client.submit_limit( + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + reqid=order.reqid, + ) + + err = resp['error'] + if err: + oid = order.oid + log.error(f'Failed to submit order: {oid}') + + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=order.reqid, + symbol=order.symbol, + reason="Failed order submission", + broker_details=resp + ).dict() + ) + else: + # TODO: handle multiple orders (cancels?) + # txid is an array of strings + if order.reqid is None: + reqid = resp['result']['txid'][0] + else: + # update the internal pairing of oid to krakens + # txid with the new txid that is returned on edit + reqid = resp['result']['txid'] + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + # account the made the order + account=order.account + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + # Send order cancellation to kraken + resp = await client.submit_cancel( + reqid=msg.reqid + ) + + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled. + try: + result = resp['result'] + count = result['count'] + + # check for 'error' key if we received no 'result' + except KeyError: + error = resp.get('error') + + await ems_order_stream.send( + BrokerdError( + oid=msg.oid, + reqid=msg.reqid, + symbol=msg.symbol, + reason="Failed order cancel", + broker_details=resp + ).dict() + ) + + if not error: + raise BrokerError(f'Unknown order cancel response: {resp}') + + else: + if not count: # no orders were cancelled? + + # XXX: what exactly is this from and why would we care? + # there doesn't seem to be any docs here? + # https://docs.kraken.com/rest/#operation/cancelOrder + + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + pending = result.get('pending') + if pending: + log.error(f'Order {oid} cancel was not yet successful') + + await ems_order_stream.send( + BrokerdError( + oid=msg.oid, + reqid=msg.reqid, + symbol=msg.symbol, + # TODO: maybe figure out if pending + # cancels will eventually get cancelled + reason="Order cancel is still pending?", + broker_details=resp + ).dict() + ) + + else: # order cancel success case. + + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + else: + log.error(f'Unknown order command: {request_msg}') + + +def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + ## TODO: point to the auth urls + https://docs.kraken.com/websockets/#message-subscribe + + ''' + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'event': 'subscribe', + 'subscription': data, + } + + +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None, +) -> AsyncIterator[dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + @acm + async def subscribe(ws: wsproto.WSConnection, token: str): + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + trades_sub = make_auth_sub( + {'name': 'ownTrades', 'token': token} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(trades_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': ['ownTrades'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # Authenticated block + async with get_client() as client: + if not client._api_key: + log.error('Missing Kraken API key: Trades WS connection failed') + await ctx.started(({}, ['paper'])) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + + client = PaperBoi( + 'kraken', + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + + # TODO: load paper positions from ``positions.toml`` + _positions={}, + ) + + # TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + acc_name = 'kraken.' + client._name + trades = await client.get_trades() + + position_msgs = pack_positions(acc_name, trades) + + await ctx.started((position_msgs, (acc_name,))) + + # Get websocket token for authenticated data stream + # Assert that a token was actually received. + resp = await client.endpoint('GetWebSocketsToken', {}) + + # lol wtf is this.. + assert resp['error'] == [] + + token = resp['result']['token'] + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + + # prepare and send a filled status update + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + + await ems_stream.send(filled_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + + +async def process_trade_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + sequence_counter = 0 + async for msg in stream_messages(ws): + + try: + # check that we are on the ownTrades stream and that msgs + # are arriving in sequence with kraken For clarification the + # kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades + assert msg[1] == 'ownTrades' + assert msg[2]['sequence'] > sequence_counter + sequence_counter += 1 + raw_msgs = msg[0] + trade_msgs = [] + + # Check that we are only processing new trades + if msg[2]['sequence'] != 1: + # check if its a new order or an update msg + for trade_msg in raw_msgs: + trade = list(trade_msg.values())[0] + order_msg = Trade( + reqid=trade['ordertxid'], + action=trade['type'], + price=trade['price'], + size=trade['vol'], + broker_time=trade['time'] + ) + trade_msgs.append(order_msg) + + yield trade_msgs + + except AssertionError: + print(f'UNHANDLED MSG: {msg}') + yield msg From 208e2e9e97860c943615e58d4eced9f8c3c4127b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 13:24:47 -0400 Subject: [PATCH 03/18] Move core api code into sub-module --- piker/brokers/kraken/__init__.py | 872 ++----------------------------- piker/brokers/kraken/api.py | 864 ++++++++++++++++++++++++++++++ 2 files changed, 897 insertions(+), 839 deletions(-) create mode 100644 piker/brokers/kraken/api.py diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 56c6ec85..5dbe709b 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -17,848 +17,42 @@ ''' Kraken backend. +Sub-modules within break into the core functionalities: + +- ``broker.py`` part for orders / trading endpoints +- ``feed.py`` for real-time data feed endpoints +- ``api.py`` for the core API machinery which is ``trio``-ized + wrapping around ``ib_insync``. + ''' -from contextlib import asynccontextmanager as acm -from dataclasses import asdict, field -from datetime import datetime -from typing import Any, Optional, Callable, Union -import time - -from trio_typing import TaskStatus -import trio -import pendulum -import asks -from fuzzywuzzy import process as fuzzy -import numpy as np -import tractor -from pydantic.dataclasses import dataclass -from pydantic import BaseModel -import wsproto -import urllib.parse -import hashlib -import hmac -import base64 - -from piker import config -from piker._cacheables import open_cached_client -from piker.brokers._util import ( - resproc, - SymbolNotFound, - BrokerError, - DataThrottle, - DataUnavailable, +from .api import ( + get_client, + open_history_client, + open_symbol_search, + stream_quotes, +) +# TODO: +# from .feed import ( +# ) +from .broker import ( + trades_dialogue, + # TODO: + # norm_trade_records, ) -from piker.log import get_logger, get_console_log -from piker.data import ShmArray -from piker.data._web_bs import open_autorecon_ws, NoBsWs -log = get_logger(__name__) - -# // -_url = 'https://api.kraken.com/0' - - -# Broker specific ohlc schema which includes a vwap field -_ohlc_dtype = [ - ('index', int), - ('time', int), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', float), - ('count', int), - ('bar_wap', float), +__all__ = [ + 'get_client', + 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', ] -# UI components allow this to be declared such that additional -# (historical) fields can be exposed. -ohlc_dtype = np.dtype(_ohlc_dtype) -_show_wap_in_history = True - - -_symbol_info_translation: dict[str, str] = { - 'tick_decimals': 'pair_decimals', -} - - -# https://www.kraken.com/features/api#get-tradable-pairs -class Pair(BaseModel): - altname: str # alternate pair name - wsname: str # WebSocket pair name (if available) - aclass_base: str # asset class of base component - base: str # asset id of base component - aclass_quote: str # asset class of quote component - quote: str # asset id of quote component - lot: str # volume lot size - - pair_decimals: int # scaling decimal places for pair - lot_decimals: int # scaling decimal places for volume - - # amount to multiply lot volume by to get currency volume - lot_multiplier: float - - # array of leverage amounts available when buying - leverage_buy: list[int] - # array of leverage amounts available when selling - leverage_sell: list[int] - - # fee schedule array in [volume, percent fee] tuples - fees: list[tuple[int, float]] - - # maker fee schedule array in [volume, percent fee] tuples (if on - # maker/taker) - fees_maker: list[tuple[int, float]] - - fee_volume_currency: str # volume discount currency - margin_call: str # margin call level - margin_stop: str # stop-out/liquidation margin level - ordermin: float # minimum order volume for pair - - -@dataclass -class OHLC: - ''' - Description of the flattened OHLC quote format. - - For schema details see: - https://docs.kraken.com/websockets/#message-ohlc - - ''' - chan_id: int # internal kraken id - chan_name: str # eg. ohlc-1 (name-interval) - pair: str # fx pair - time: float # Begin time of interval, in seconds since epoch - etime: float # End time of interval, in seconds since epoch - open: float # Open price of interval - high: float # High price within interval - low: float # Low price within interval - close: float # Close price of interval - vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume **within interval** - count: int # Number of trades within interval - # (sampled) generated tick data - ticks: list[Any] = field(default_factory=list) - - -def get_config() -> dict[str, Any]: - - conf, path = config.load() - section = conf.get('kraken') - - if section is None: - log.warning(f'No config section found for kraken in {path}') - return {} - - return section - - -def get_kraken_signature( - urlpath: str, - data: dict[str, Any], - secret: str -) -> str: - postdata = urllib.parse.urlencode(data) - encoded = (str(data['nonce']) + postdata).encode() - message = urlpath.encode() + hashlib.sha256(encoded).digest() - - mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512) - sigdigest = base64.b64encode(mac.digest()) - return sigdigest.decode() - - -class InvalidKey(ValueError): - ''' - EAPI:Invalid key - This error is returned when the API key used for the call is - either expired or disabled, please review the API key in your - Settings -> API tab of account management or generate a new one - and update your application. - - ''' - - -class Client: - - def __init__( - self, - name: str = '', - api_key: str = '', - secret: str = '' - ) -> None: - self._sesh = asks.Session(connections=4) - self._sesh.base_location = _url - self._sesh.headers.update({ - 'User-Agent': - 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' - }) - self._pairs: list[str] = [] - self._name = name - self._api_key = api_key - self._secret = secret - - @property - def pairs(self) -> dict[str, Any]: - if self._pairs is None: - raise RuntimeError( - "Make sure to run `cache_symbols()` on startup!" - ) - # retreive and cache all symbols - - return self._pairs - - async def _public( - self, - method: str, - data: dict, - ) -> dict[str, Any]: - resp = await self._sesh.post( - path=f'/public/{method}', - json=data, - timeout=float('inf') - ) - return resproc(resp, log) - - async def _private( - self, - method: str, - data: dict, - uri_path: str - ) -> dict[str, Any]: - headers = { - 'Content-Type': - 'application/x-www-form-urlencoded', - 'API-Key': - self._api_key, - 'API-Sign': - get_kraken_signature(uri_path, data, self._secret) - } - resp = await self._sesh.post( - path=f'/private/{method}', - data=data, - headers=headers, - timeout=float('inf') - ) - return resproc(resp, log) - - async def endpoint( - self, - method: str, - data: dict[str, Any] - ) -> dict[str, Any]: - uri_path = f'/0/private/{method}' - data['nonce'] = str(int(1000*time.time())) - return await self._private(method, data, uri_path) - - async def get_trades( - self, - data: dict[str, Any] = {} - ) -> dict[str, Any]: - data['ofs'] = 0 - # Grab all trade history - # https://docs.kraken.com/rest/#operation/getTradeHistory - # Kraken uses 'ofs' to refer to the offset - while True: - resp = await self.endpoint('TradesHistory', data) - # grab the first 50 trades - if data['ofs'] == 0: - trades = resp['result']['trades'] - # load the next 50 trades using dict constructor - # for speed - elif data['ofs'] == 50: - trades = dict(trades, **resp['result']['trades']) - # catch the end of the trades - elif resp['result']['trades'] == {}: - count = resp['result']['count'] - break - # update existing dict if num trades exceeds 100 - else: - trades.update(resp['result']['trades']) - # increment the offset counter - data['ofs'] += 50 - # To avoid exceeding API rate limit in case of a lot of trades - await trio.sleep(1) - - # make sure you grabbed all the trades - assert count == len(trades.values()) - - return trades - - async def submit_limit( - self, - symbol: str, - price: float, - action: str, - size: float, - reqid: str = None, - validate: bool = False # set True test call without a real submission - ) -> dict: - ''' - Place an order and return integer request id provided by client. - - ''' - # Build common data dict for common keys from both endpoints - data = { - "pair": symbol, - "price": str(price), - "validate": validate - } - if reqid is None: - # Build order data for kraken api - data |= { - "ordertype": "limit", "type": action, "volume": str(size) - } - return await self.endpoint('AddOrder', data) - else: - # Edit order data for kraken api - data["txid"] = reqid - return await self.endpoint('EditOrder', data) - - async def submit_cancel( - self, - reqid: str, - ) -> dict: - ''' - Send cancel request for order id ``reqid``. - - ''' - # txid is a transaction id given by kraken - return await self.endpoint('CancelOrder', {"txid": reqid}) - - async def symbol_info( - self, - pair: Optional[str] = None, - ): - if pair is not None: - pairs = {'pair': pair} - else: - pairs = None # get all pairs - - resp = await self._public('AssetPairs', pairs) - err = resp['error'] - if err: - symbolname = pairs['pair'] if pair else None - raise SymbolNotFound(f'{symbolname}.kraken') - - pairs = resp['result'] - - if pair is not None: - _, data = next(iter(pairs.items())) - return data - else: - return pairs - - async def cache_symbols( - self, - ) -> dict: - if not self._pairs: - self._pairs = await self.symbol_info() - - return self._pairs - - async def search_symbols( - self, - pattern: str, - limit: int = None, - ) -> dict[str, Any]: - if self._pairs is not None: - data = self._pairs - else: - data = await self.symbol_info() - - matches = fuzzy.extractBests( - pattern, - data, - score_cutoff=50, - ) - # repack in dict form - return {item[0]['altname']: item[0] for item in matches} - - async def bars( - self, - symbol: str = 'XBTUSD', - - # UTC 2017-07-02 12:53:20 - since: Optional[Union[int, datetime]] = None, - count: int = 720, # <- max allowed per query - as_np: bool = True, - - ) -> dict: - - if since is None: - since = pendulum.now('UTC').start_of('minute').subtract( - minutes=count).timestamp() - - elif isinstance(since, int): - since = pendulum.from_timestamp(since).timestamp() - - else: # presumably a pendulum datetime - since = since.timestamp() - - # UTC 2017-07-02 12:53:20 is oldest seconds value - since = str(max(1499000000, int(since))) - json = await self._public( - 'OHLC', - data={ - 'pair': symbol, - 'since': since, - }, - ) - try: - res = json['result'] - res.pop('last') - bars = next(iter(res.values())) - - new_bars = [] - - first = bars[0] - last_nz_vwap = first[-3] - if last_nz_vwap == 0: - # use close if vwap is zero - last_nz_vwap = first[-4] - - # convert all fields to native types - for i, bar in enumerate(bars): - # normalize weird zero-ed vwap values..cmon kraken.. - # indicates vwap didn't change since last bar - vwap = float(bar.pop(-3)) - if vwap != 0: - last_nz_vwap = vwap - if vwap == 0: - vwap = last_nz_vwap - - # re-insert vwap as the last of the fields - bar.append(vwap) - - new_bars.append( - (i,) + tuple( - ftype(bar[j]) for j, (name, ftype) in enumerate( - _ohlc_dtype[1:] - ) - ) - ) - array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars - return array - except KeyError: - errmsg = json['error'][0] - - if 'not found' in errmsg: - raise SymbolNotFound(errmsg + f': {symbol}') - - elif 'Too many requests' in errmsg: - raise DataThrottle(f'{symbol}') - - else: - raise BrokerError(errmsg) - - -@acm -async def get_client() -> Client: - - section = get_config() - if section: - client = Client( - name=section['key_descr'], - api_key=section['api_key'], - secret=section['secret'] - ) - else: - client = Client() - - # at startup, load all symbols locally for fast search - await client.cache_symbols() - - yield client - - -def normalize_symbol( - ticker: str -) -> str: - # This is to convert symbol names from what kraken - # uses to the traditional 3x3 pair symbol syntax - symlen = len(ticker) - if symlen == 6: - return ticker.lower() - else: - for sym in ['XXBT', 'XXMR', 'ZEUR']: - if sym in ticker: - ticker = ticker.replace(sym, sym[1:]) - return ticker.lower() - - -async def stream_messages( - ws: NoBsWs, -): - ''' - Message stream parser and heartbeat handler. - - Deliver ws subscription messages as well as handle heartbeat logic - though a single async generator. - - ''' - too_slow_count = last_hb = 0 - - while True: - - with trio.move_on_after(5) as cs: - msg = await ws.recv_msg() - - # trigger reconnection if heartbeat is laggy - if cs.cancelled_caught: - - too_slow_count += 1 - - if too_slow_count > 20: - log.warning( - "Heartbeat is too slow, resetting ws connection") - - await ws._connect() - too_slow_count = 0 - continue - - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - - now = time.time() - delay = now - last_hb - last_hb = now - - # XXX: why tf is this not printing without --tl flag? - log.debug(f"Heartbeat after {delay}") - # print(f"Heartbeat after {delay}") - - continue - - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - yield msg - - -async def process_data_feed_msgs( - ws: NoBsWs, -): - ''' - Parse and pack data feed messages. - - ''' - async for msg in stream_messages(ws): - - chan_id, *payload_array, chan_name, pair = msg - - if 'ohlc' in chan_name: - - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) - - elif 'spread' in chan_name: - - bid, ask, ts, bsize, asize = map(float, payload_array[0]) - - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, - - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote - - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) - - else: - print(f'UNHANDLED MSG: {msg}') - yield msg - - -def normalize( - ohlc: OHLC, - -) -> dict: - quote = asdict(ohlc) - quote['broker_ts'] = quote['time'] - quote['brokerd_ts'] = time.time() - quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') - quote['last'] = quote['close'] - quote['bar_wap'] = ohlc.vwap - - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - # XXX: piker style is always lowercases symbols. - topic = quote['pair'].replace('/', '').lower() - - # print(quote) - return topic, quote - - -def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'pair': pairs, - 'event': 'subscribe', - 'subscription': data, - } - - -@acm -async def open_history_client( - symbol: str, - -) -> tuple[Callable, int]: - - # TODO implement history getter for the new storage layer. - async with open_cached_client('kraken') as client: - - # lol, kraken won't send any more then the "last" - # 720 1m bars.. so we have to just ignore further - # requests of this type.. - queries: int = 0 - - async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, - - ) -> tuple[ - np.ndarray, - datetime, # start - datetime, # end - ]: - - nonlocal queries - if queries > 0: - raise DataUnavailable - - count = 0 - while count <= 3: - try: - array = await client.bars( - symbol, - since=end_dt, - ) - count += 1 - queries += 1 - break - except DataThrottle: - log.warning(f'kraken OHLC throttle for {symbol}') - await trio.sleep(1) - - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) - return array, start_dt, end_dt - - yield get_ohlc, {'erlangs': 1, 'rate': 1} - - -async def backfill_bars( - - sym: str, - shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - ''' - with trio.CancelScope() as cs: - async with open_cached_client('kraken') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # backend specific - sub_type: str = 'ohlc', - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Subscribe for ohlc stream of quotes for ``pairs``. - - ``pairs`` must be formatted /. - - ''' - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - ws_pairs = {} - sym_infos = {} - - async with open_cached_client('kraken') as client, send_chan as send_chan: - - # keep client cached for real-time section - for sym in symbols: - - # transform to upper since piker style is always lower - sym = sym.upper() - - si = Pair(**await client.symbol_info(sym)) # validation - syminfo = si.dict() - syminfo['price_tick_size'] = 1 / 10**si.pair_decimals - syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals - syminfo['asset_type'] = 'crypto' - sym_infos[sym] = syminfo - ws_pairs[sym] = si.wsname - - symbol = symbols[0].lower() - - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - symbol: { - 'symbol_info': sym_infos[sym], - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } - - @acm - async def subscribe(ws: wsproto.WSConnection): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(ohlc_sub) - - # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} - ) - - # pull a first quote and deliver - await ws.send_msg(l1_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'pair': list(ws_pairs.values()), - 'event': 'unsubscribe', - 'subscription': ['ohlc', 'spread'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - # see the tips on reconnection logic: - # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds - ws: NoBsWs - async with open_autorecon_ws( - 'wss://ws.kraken.com/', - fixture=subscribe, - ) as ws: - - # pull a first quote and deliver - msg_gen = process_data_feed_msgs(ws) - - # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() - - topic, quote = normalize(ohlc_last) - - task_status.started((init_msgs, quote)) - - # lol, only "closes" when they're margin squeezing clients ;P - feed_is_live.set() - - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime - - # start streaming - async for typ, ohlc in msg_gen: - - if typ == 'ohlc': - - # TODO: can get rid of all this by using - # ``trades`` subscription... - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - - # new OHLC sample interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume - - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - ohlc_last = ohlc - last = ohlc.close - - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) - - topic, quote = normalize(ohlc) - - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'].lower() - - await send_chan.send({topic: quote}) - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> Client: - async with open_cached_client('kraken') as client: - - # load all symbols locally for fast search - cache = await client.cache_symbols() - await ctx.started(cache) - - async with ctx.open_stream() as stream: - - async for pattern in stream: - - matches = fuzzy.extractBests( - pattern, - cache, - score_cutoff=50, - ) - # repack in dict form - await stream.send( - {item[0]['altname']: item[0] - for item in matches} - ) +# tractor RPC enable arg +__enable_modules__: list[str] = [ + 'api', + # TODO: + # 'feed', + 'broker', +] diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py new file mode 100644 index 00000000..db803cf1 --- /dev/null +++ b/piker/brokers/kraken/api.py @@ -0,0 +1,864 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Kraken web API wrapping. + +''' +from contextlib import asynccontextmanager as acm +from dataclasses import asdict, field +from datetime import datetime +from typing import Any, Optional, Callable, Union +import time + +from trio_typing import TaskStatus +import trio +import pendulum +import asks +from fuzzywuzzy import process as fuzzy +import numpy as np +import tractor +from pydantic.dataclasses import dataclass +from pydantic import BaseModel +import wsproto +import urllib.parse +import hashlib +import hmac +import base64 + +from piker import config +from piker._cacheables import open_cached_client +from piker.brokers._util import ( + resproc, + SymbolNotFound, + BrokerError, + DataThrottle, + DataUnavailable, +) +from piker.log import get_logger, get_console_log +from piker.data import ShmArray +from piker.data._web_bs import open_autorecon_ws, NoBsWs + +log = get_logger(__name__) + +# // +_url = 'https://api.kraken.com/0' + + +# Broker specific ohlc schema which includes a vwap field +_ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('count', int), + ('bar_wap', float), +] + +# UI components allow this to be declared such that additional +# (historical) fields can be exposed. +ohlc_dtype = np.dtype(_ohlc_dtype) + +_show_wap_in_history = True + + +_symbol_info_translation: dict[str, str] = { + 'tick_decimals': 'pair_decimals', +} + + +# https://www.kraken.com/features/api#get-tradable-pairs +class Pair(BaseModel): + altname: str # alternate pair name + wsname: str # WebSocket pair name (if available) + aclass_base: str # asset class of base component + base: str # asset id of base component + aclass_quote: str # asset class of quote component + quote: str # asset id of quote component + lot: str # volume lot size + + pair_decimals: int # scaling decimal places for pair + lot_decimals: int # scaling decimal places for volume + + # amount to multiply lot volume by to get currency volume + lot_multiplier: float + + # array of leverage amounts available when buying + leverage_buy: list[int] + # array of leverage amounts available when selling + leverage_sell: list[int] + + # fee schedule array in [volume, percent fee] tuples + fees: list[tuple[int, float]] + + # maker fee schedule array in [volume, percent fee] tuples (if on + # maker/taker) + fees_maker: list[tuple[int, float]] + + fee_volume_currency: str # volume discount currency + margin_call: str # margin call level + margin_stop: str # stop-out/liquidation margin level + ordermin: float # minimum order volume for pair + + +@dataclass +class OHLC: + ''' + Description of the flattened OHLC quote format. + + For schema details see: + https://docs.kraken.com/websockets/#message-ohlc + + ''' + chan_id: int # internal kraken id + chan_name: str # eg. ohlc-1 (name-interval) + pair: str # fx pair + time: float # Begin time of interval, in seconds since epoch + etime: float # End time of interval, in seconds since epoch + open: float # Open price of interval + high: float # High price within interval + low: float # Low price within interval + close: float # Close price of interval + vwap: float # Volume weighted average price within interval + volume: float # Accumulated volume **within interval** + count: int # Number of trades within interval + # (sampled) generated tick data + ticks: list[Any] = field(default_factory=list) + + +def get_config() -> dict[str, Any]: + + conf, path = config.load() + section = conf.get('kraken') + + if section is None: + log.warning(f'No config section found for kraken in {path}') + return {} + + return section + + +def get_kraken_signature( + urlpath: str, + data: dict[str, Any], + secret: str +) -> str: + postdata = urllib.parse.urlencode(data) + encoded = (str(data['nonce']) + postdata).encode() + message = urlpath.encode() + hashlib.sha256(encoded).digest() + + mac = hmac.new(base64.b64decode(secret), message, hashlib.sha512) + sigdigest = base64.b64encode(mac.digest()) + return sigdigest.decode() + + +class InvalidKey(ValueError): + ''' + EAPI:Invalid key + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one + and update your application. + + ''' + + +class Client: + + def __init__( + self, + name: str = '', + api_key: str = '', + secret: str = '' + ) -> None: + self._sesh = asks.Session(connections=4) + self._sesh.base_location = _url + self._sesh.headers.update({ + 'User-Agent': + 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' + }) + self._pairs: list[str] = [] + self._name = name + self._api_key = api_key + self._secret = secret + + @property + def pairs(self) -> dict[str, Any]: + if self._pairs is None: + raise RuntimeError( + "Make sure to run `cache_symbols()` on startup!" + ) + # retreive and cache all symbols + + return self._pairs + + async def _public( + self, + method: str, + data: dict, + ) -> dict[str, Any]: + resp = await self._sesh.post( + path=f'/public/{method}', + json=data, + timeout=float('inf') + ) + return resproc(resp, log) + + async def _private( + self, + method: str, + data: dict, + uri_path: str + ) -> dict[str, Any]: + headers = { + 'Content-Type': + 'application/x-www-form-urlencoded', + 'API-Key': + self._api_key, + 'API-Sign': + get_kraken_signature(uri_path, data, self._secret) + } + resp = await self._sesh.post( + path=f'/private/{method}', + data=data, + headers=headers, + timeout=float('inf') + ) + return resproc(resp, log) + + async def endpoint( + self, + method: str, + data: dict[str, Any] + ) -> dict[str, Any]: + uri_path = f'/0/private/{method}' + data['nonce'] = str(int(1000*time.time())) + return await self._private(method, data, uri_path) + + async def get_trades( + self, + data: dict[str, Any] = {} + ) -> dict[str, Any]: + data['ofs'] = 0 + # Grab all trade history + # https://docs.kraken.com/rest/#operation/getTradeHistory + # Kraken uses 'ofs' to refer to the offset + while True: + resp = await self.endpoint('TradesHistory', data) + # grab the first 50 trades + if data['ofs'] == 0: + trades = resp['result']['trades'] + # load the next 50 trades using dict constructor + # for speed + elif data['ofs'] == 50: + trades = dict(trades, **resp['result']['trades']) + # catch the end of the trades + elif resp['result']['trades'] == {}: + count = resp['result']['count'] + break + # update existing dict if num trades exceeds 100 + else: + trades.update(resp['result']['trades']) + # increment the offset counter + data['ofs'] += 50 + # To avoid exceeding API rate limit in case of a lot of trades + await trio.sleep(1) + + # make sure you grabbed all the trades + assert count == len(trades.values()) + + return trades + + async def submit_limit( + self, + symbol: str, + price: float, + action: str, + size: float, + reqid: str = None, + validate: bool = False # set True test call without a real submission + ) -> dict: + ''' + Place an order and return integer request id provided by client. + + ''' + # Build common data dict for common keys from both endpoints + data = { + "pair": symbol, + "price": str(price), + "validate": validate + } + if reqid is None: + # Build order data for kraken api + data |= { + "ordertype": "limit", "type": action, "volume": str(size) + } + return await self.endpoint('AddOrder', data) + else: + # Edit order data for kraken api + data["txid"] = reqid + return await self.endpoint('EditOrder', data) + + async def submit_cancel( + self, + reqid: str, + ) -> dict: + ''' + Send cancel request for order id ``reqid``. + + ''' + # txid is a transaction id given by kraken + return await self.endpoint('CancelOrder', {"txid": reqid}) + + async def symbol_info( + self, + pair: Optional[str] = None, + ): + if pair is not None: + pairs = {'pair': pair} + else: + pairs = None # get all pairs + + resp = await self._public('AssetPairs', pairs) + err = resp['error'] + if err: + symbolname = pairs['pair'] if pair else None + raise SymbolNotFound(f'{symbolname}.kraken') + + pairs = resp['result'] + + if pair is not None: + _, data = next(iter(pairs.items())) + return data + else: + return pairs + + async def cache_symbols( + self, + ) -> dict: + if not self._pairs: + self._pairs = await self.symbol_info() + + return self._pairs + + async def search_symbols( + self, + pattern: str, + limit: int = None, + ) -> dict[str, Any]: + if self._pairs is not None: + data = self._pairs + else: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['altname']: item[0] for item in matches} + + async def bars( + self, + symbol: str = 'XBTUSD', + + # UTC 2017-07-02 12:53:20 + since: Optional[Union[int, datetime]] = None, + count: int = 720, # <- max allowed per query + as_np: bool = True, + + ) -> dict: + + if since is None: + since = pendulum.now('UTC').start_of('minute').subtract( + minutes=count).timestamp() + + elif isinstance(since, int): + since = pendulum.from_timestamp(since).timestamp() + + else: # presumably a pendulum datetime + since = since.timestamp() + + # UTC 2017-07-02 12:53:20 is oldest seconds value + since = str(max(1499000000, int(since))) + json = await self._public( + 'OHLC', + data={ + 'pair': symbol, + 'since': since, + }, + ) + try: + res = json['result'] + res.pop('last') + bars = next(iter(res.values())) + + new_bars = [] + + first = bars[0] + last_nz_vwap = first[-3] + if last_nz_vwap == 0: + # use close if vwap is zero + last_nz_vwap = first[-4] + + # convert all fields to native types + for i, bar in enumerate(bars): + # normalize weird zero-ed vwap values..cmon kraken.. + # indicates vwap didn't change since last bar + vwap = float(bar.pop(-3)) + if vwap != 0: + last_nz_vwap = vwap + if vwap == 0: + vwap = last_nz_vwap + + # re-insert vwap as the last of the fields + bar.append(vwap) + + new_bars.append( + (i,) + tuple( + ftype(bar[j]) for j, (name, ftype) in enumerate( + _ohlc_dtype[1:] + ) + ) + ) + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars + return array + except KeyError: + errmsg = json['error'][0] + + if 'not found' in errmsg: + raise SymbolNotFound(errmsg + f': {symbol}') + + elif 'Too many requests' in errmsg: + raise DataThrottle(f'{symbol}') + + else: + raise BrokerError(errmsg) + + +@acm +async def get_client() -> Client: + + section = get_config() + if section: + client = Client( + name=section['key_descr'], + api_key=section['api_key'], + secret=section['secret'] + ) + else: + client = Client() + + # at startup, load all symbols locally for fast search + await client.cache_symbols() + + yield client + + +def normalize_symbol( + ticker: str +) -> str: + # This is to convert symbol names from what kraken + # uses to the traditional 3x3 pair symbol syntax + symlen = len(ticker) + if symlen == 6: + return ticker.lower() + else: + for sym in ['XXBT', 'XXMR', 'ZEUR']: + if sym in ticker: + ticker = ticker.replace(sym, sym[1:]) + return ticker.lower() + + +async def stream_messages( + ws: NoBsWs, +): + ''' + Message stream parser and heartbeat handler. + + Deliver ws subscription messages as well as handle heartbeat logic + though a single async generator. + + ''' + too_slow_count = last_hb = 0 + + while True: + + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + + # trigger reconnection if heartbeat is laggy + if cs.cancelled_caught: + + too_slow_count += 1 + + if too_slow_count > 20: + log.warning( + "Heartbeat is too slow, resetting ws connection") + + await ws._connect() + too_slow_count = 0 + continue + + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + + now = time.time() + delay = now - last_hb + last_hb = now + + # XXX: why tf is this not printing without --tl flag? + log.debug(f"Heartbeat after {delay}") + # print(f"Heartbeat after {delay}") + + continue + + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + yield msg + + +async def process_data_feed_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + async for msg in stream_messages(ws): + + chan_id, *payload_array, chan_name, pair = msg + + if 'ohlc' in chan_name: + + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + + elif 'spread' in chan_name: + + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') + yield msg + + +def normalize( + ohlc: OHLC, + +) -> dict: + quote = asdict(ohlc) + quote['broker_ts'] = quote['time'] + quote['brokerd_ts'] = time.time() + quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') + quote['last'] = quote['close'] + quote['bar_wap'] = ohlc.vwap + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + # XXX: piker style is always lowercases symbols. + topic = quote['pair'].replace('/', '').lower() + + # print(quote) + return topic, quote + + +def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + https://docs.kraken.com/websockets/#message-subscribe + + ''' + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'pair': pairs, + 'event': 'subscribe', + 'subscription': data, + } + + +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('kraken') as client: + + # lol, kraken won't send any more then the "last" + # 720 1m bars.. so we have to just ignore further + # requests of this type.. + queries: int = 0 + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + nonlocal queries + if queries > 0: + raise DataUnavailable + + count = 0 + while count <= 3: + try: + array = await client.bars( + symbol, + since=end_dt, + ) + count += 1 + queries += 1 + break + except DataThrottle: + log.warning(f'kraken OHLC throttle for {symbol}') + await trio.sleep(1) + + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 1, 'rate': 1} + + +async def backfill_bars( + + sym: str, + shm: ShmArray, # type: ignore # noqa + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Fill historical bars into shared mem / storage afap. + ''' + with trio.CancelScope() as cs: + async with open_cached_client('kraken') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # backend specific + sub_type: str = 'ohlc', + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Subscribe for ohlc stream of quotes for ``pairs``. + + ``pairs`` must be formatted /. + + ''' + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + ws_pairs = {} + sym_infos = {} + + async with open_cached_client('kraken') as client, send_chan as send_chan: + + # keep client cached for real-time section + for sym in symbols: + + # transform to upper since piker style is always lower + sym = sym.upper() + + si = Pair(**await client.symbol_info(sym)) # validation + syminfo = si.dict() + syminfo['price_tick_size'] = 1 / 10**si.pair_decimals + syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals + syminfo['asset_type'] = 'crypto' + sym_infos[sym] = syminfo + ws_pairs[sym] = si.wsname + + symbol = symbols[0].lower() + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + symbol: { + 'symbol_info': sym_infos[sym], + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + }, + } + + @acm + async def subscribe(ws: wsproto.WSConnection): + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + ohlc_sub = make_sub( + list(ws_pairs.values()), + {'name': 'ohlc', 'interval': 1} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(ohlc_sub) + + # trade data (aka L1) + l1_sub = make_sub( + list(ws_pairs.values()), + {'name': 'spread'} # 'depth': 10} + ) + + # pull a first quote and deliver + await ws.send_msg(l1_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # see the tips on reconnection logic: + # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + ws: NoBsWs + async with open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws: + + # pull a first quote and deliver + msg_gen = process_data_feed_msgs(ws) + + # TODO: use ``anext()`` when it lands in 3.10! + typ, ohlc_last = await msg_gen.__anext__() + + topic, quote = normalize(ohlc_last) + + task_status.started((init_msgs, quote)) + + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() + + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime + + # start streaming + async for typ, ohlc in msg_gen: + + if typ == 'ohlc': + + # TODO: can get rid of all this by using + # ``trades`` subscription... + + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + + # new OHLC sample interval + if ohlc.etime > last_interval_start: + last_interval_start = ohlc.etime + tick_volume = volume + + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume + + ohlc_last = ohlc + last = ohlc.close + + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) + + topic, quote = normalize(ohlc) + + elif typ == 'l1': + quote = ohlc + topic = quote['symbol'].lower() + + await send_chan.send({topic: quote}) + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, + +) -> Client: + async with open_cached_client('kraken') as client: + + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started(cache) + + async with ctx.open_stream() as stream: + + async for pattern in stream: + + matches = fuzzy.extractBests( + pattern, + cache, + score_cutoff=50, + ) + # repack in dict form + await stream.send( + {item[0]['altname']: item[0] + for item in matches} + ) From f87a2a810ad6582e7185eacc68693bb55b517383 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 13:25:47 -0400 Subject: [PATCH 04/18] Make broker mod import from new api mod --- piker/brokers/kraken/broker.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 4d1f0338..e115da64 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -13,6 +13,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . + ''' Order api and machinery @@ -33,7 +34,13 @@ import trio import tractor import wsproto -from . import ( +from piker.clearing._paper_engine import PaperBoi +from piker.clearing._messages import ( + BrokerdPosition, BrokerdOrder, BrokerdStatus, + BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdFill, +) +from .api import ( Client, BrokerError, get_client, @@ -44,12 +51,6 @@ from . import ( NoBsWs, stream_messages, ) -from ..clearing._paper_engine import PaperBoi -from ..clearing._messages import ( - BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, - BrokerdFill, -) class Trade(BaseModel): From d3caad6e116e4066e62608901ca80995bbe00064 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 13:48:01 -0400 Subject: [PATCH 05/18] Factor data feeds endpoints into new sub-mod --- piker/brokers/kraken/__init__.py | 11 +- piker/brokers/kraken/api.py | 431 +--------------------------- piker/brokers/kraken/broker.py | 4 +- piker/brokers/kraken/feed.py | 464 +++++++++++++++++++++++++++++++ 4 files changed, 479 insertions(+), 431 deletions(-) create mode 100644 piker/brokers/kraken/feed.py diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 5dbe709b..47128f52 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -27,16 +27,16 @@ Sub-modules within break into the core functionalities: ''' from .api import ( get_client, +) +from .feed import ( open_history_client, open_symbol_search, stream_quotes, ) -# TODO: -# from .feed import ( -# ) from .broker import ( trades_dialogue, - # TODO: + + # TODO: part of pps/ledger work # norm_trade_records, ) @@ -52,7 +52,6 @@ __all__ = [ # tractor RPC enable arg __enable_modules__: list[str] = [ 'api', - # TODO: - # 'feed', + 'feed', 'broker', ] diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index db803cf1..2435b235 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -19,38 +19,34 @@ Kraken web API wrapping. ''' from contextlib import asynccontextmanager as acm -from dataclasses import asdict, field +from dataclasses import field from datetime import datetime -from typing import Any, Optional, Callable, Union +from typing import ( + Any, + Optional, + Union, +) import time -from trio_typing import TaskStatus import trio import pendulum import asks from fuzzywuzzy import process as fuzzy import numpy as np -import tractor from pydantic.dataclasses import dataclass -from pydantic import BaseModel -import wsproto import urllib.parse import hashlib import hmac import base64 from piker import config -from piker._cacheables import open_cached_client from piker.brokers._util import ( resproc, SymbolNotFound, BrokerError, DataThrottle, - DataUnavailable, ) -from piker.log import get_logger, get_console_log -from piker.data import ShmArray -from piker.data._web_bs import open_autorecon_ws, NoBsWs +from piker.log import get_logger log = get_logger(__name__) @@ -76,47 +72,11 @@ _ohlc_dtype = [ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = True - - _symbol_info_translation: dict[str, str] = { 'tick_decimals': 'pair_decimals', } -# https://www.kraken.com/features/api#get-tradable-pairs -class Pair(BaseModel): - altname: str # alternate pair name - wsname: str # WebSocket pair name (if available) - aclass_base: str # asset class of base component - base: str # asset id of base component - aclass_quote: str # asset class of quote component - quote: str # asset id of quote component - lot: str # volume lot size - - pair_decimals: int # scaling decimal places for pair - lot_decimals: int # scaling decimal places for volume - - # amount to multiply lot volume by to get currency volume - lot_multiplier: float - - # array of leverage amounts available when buying - leverage_buy: list[int] - # array of leverage amounts available when selling - leverage_sell: list[int] - - # fee schedule array in [volume, percent fee] tuples - fees: list[tuple[int, float]] - - # maker fee schedule array in [volume, percent fee] tuples (if on - # maker/taker) - fees_maker: list[tuple[int, float]] - - fee_volume_currency: str # volume discount currency - margin_call: str # margin call level - margin_stop: str # stop-out/liquidation margin level - ordermin: float # minimum order volume for pair - - @dataclass class OHLC: ''' @@ -485,380 +445,3 @@ def normalize_symbol( if sym in ticker: ticker = ticker.replace(sym, sym[1:]) return ticker.lower() - - -async def stream_messages( - ws: NoBsWs, -): - ''' - Message stream parser and heartbeat handler. - - Deliver ws subscription messages as well as handle heartbeat logic - though a single async generator. - - ''' - too_slow_count = last_hb = 0 - - while True: - - with trio.move_on_after(5) as cs: - msg = await ws.recv_msg() - - # trigger reconnection if heartbeat is laggy - if cs.cancelled_caught: - - too_slow_count += 1 - - if too_slow_count > 20: - log.warning( - "Heartbeat is too slow, resetting ws connection") - - await ws._connect() - too_slow_count = 0 - continue - - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - - now = time.time() - delay = now - last_hb - last_hb = now - - # XXX: why tf is this not printing without --tl flag? - log.debug(f"Heartbeat after {delay}") - # print(f"Heartbeat after {delay}") - - continue - - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - yield msg - - -async def process_data_feed_msgs( - ws: NoBsWs, -): - ''' - Parse and pack data feed messages. - - ''' - async for msg in stream_messages(ws): - - chan_id, *payload_array, chan_name, pair = msg - - if 'ohlc' in chan_name: - - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) - - elif 'spread' in chan_name: - - bid, ask, ts, bsize, asize = map(float, payload_array[0]) - - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, - - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote - - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) - - else: - print(f'UNHANDLED MSG: {msg}') - yield msg - - -def normalize( - ohlc: OHLC, - -) -> dict: - quote = asdict(ohlc) - quote['broker_ts'] = quote['time'] - quote['brokerd_ts'] = time.time() - quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') - quote['last'] = quote['close'] - quote['bar_wap'] = ohlc.vwap - - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - # XXX: piker style is always lowercases symbols. - topic = quote['pair'].replace('/', '').lower() - - # print(quote) - return topic, quote - - -def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'pair': pairs, - 'event': 'subscribe', - 'subscription': data, - } - - -@acm -async def open_history_client( - symbol: str, - -) -> tuple[Callable, int]: - - # TODO implement history getter for the new storage layer. - async with open_cached_client('kraken') as client: - - # lol, kraken won't send any more then the "last" - # 720 1m bars.. so we have to just ignore further - # requests of this type.. - queries: int = 0 - - async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, - - ) -> tuple[ - np.ndarray, - datetime, # start - datetime, # end - ]: - - nonlocal queries - if queries > 0: - raise DataUnavailable - - count = 0 - while count <= 3: - try: - array = await client.bars( - symbol, - since=end_dt, - ) - count += 1 - queries += 1 - break - except DataThrottle: - log.warning(f'kraken OHLC throttle for {symbol}') - await trio.sleep(1) - - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) - return array, start_dt, end_dt - - yield get_ohlc, {'erlangs': 1, 'rate': 1} - - -async def backfill_bars( - - sym: str, - shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - ''' - with trio.CancelScope() as cs: - async with open_cached_client('kraken') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # backend specific - sub_type: str = 'ohlc', - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Subscribe for ohlc stream of quotes for ``pairs``. - - ``pairs`` must be formatted /. - - ''' - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - ws_pairs = {} - sym_infos = {} - - async with open_cached_client('kraken') as client, send_chan as send_chan: - - # keep client cached for real-time section - for sym in symbols: - - # transform to upper since piker style is always lower - sym = sym.upper() - - si = Pair(**await client.symbol_info(sym)) # validation - syminfo = si.dict() - syminfo['price_tick_size'] = 1 / 10**si.pair_decimals - syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals - syminfo['asset_type'] = 'crypto' - sym_infos[sym] = syminfo - ws_pairs[sym] = si.wsname - - symbol = symbols[0].lower() - - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - symbol: { - 'symbol_info': sym_infos[sym], - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } - - @acm - async def subscribe(ws: wsproto.WSConnection): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(ohlc_sub) - - # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} - ) - - # pull a first quote and deliver - await ws.send_msg(l1_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'pair': list(ws_pairs.values()), - 'event': 'unsubscribe', - 'subscription': ['ohlc', 'spread'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - # see the tips on reconnection logic: - # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds - ws: NoBsWs - async with open_autorecon_ws( - 'wss://ws.kraken.com/', - fixture=subscribe, - ) as ws: - - # pull a first quote and deliver - msg_gen = process_data_feed_msgs(ws) - - # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() - - topic, quote = normalize(ohlc_last) - - task_status.started((init_msgs, quote)) - - # lol, only "closes" when they're margin squeezing clients ;P - feed_is_live.set() - - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime - - # start streaming - async for typ, ohlc in msg_gen: - - if typ == 'ohlc': - - # TODO: can get rid of all this by using - # ``trades`` subscription... - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - - # new OHLC sample interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume - - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - ohlc_last = ohlc - last = ohlc.close - - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) - - topic, quote = normalize(ohlc) - - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'].lower() - - await send_chan.send({topic: quote}) - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> Client: - async with open_cached_client('kraken') as client: - - # load all symbols locally for fast search - cache = await client.cache_symbols() - await ctx.started(cache) - - async with ctx.open_stream() as stream: - - async for pattern in stream: - - matches = fuzzy.extractBests( - pattern, - cache, - score_cutoff=50, - ) - # repack in dict form - await stream.send( - {item[0]['altname']: item[0] - for item in matches} - ) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index e115da64..c17d9daa 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -44,9 +44,11 @@ from .api import ( Client, BrokerError, get_client, - get_console_log, log, normalize_symbol, +) +from .feed import ( + get_console_log, open_autorecon_ws, NoBsWs, stream_messages, diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py new file mode 100644 index 00000000..5742bcb1 --- /dev/null +++ b/piker/brokers/kraken/feed.py @@ -0,0 +1,464 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Real-time and historical data feed endpoints. + +''' +from contextlib import asynccontextmanager as acm +from dataclasses import asdict +from datetime import datetime +from typing import ( + Any, + Optional, + Callable, +) +import time + +from fuzzywuzzy import process as fuzzy +import numpy as np +import pendulum +from pydantic import BaseModel +from trio_typing import TaskStatus +import tractor +import trio +import wsproto + +from piker._cacheables import open_cached_client +from piker.brokers._util import ( + BrokerError, + DataThrottle, + DataUnavailable, +) +from piker.log import get_console_log +from piker.data import ShmArray +from piker.data._web_bs import open_autorecon_ws, NoBsWs +from .api import ( + Client, + log, + OHLC, +) + + +# https://www.kraken.com/features/api#get-tradable-pairs +class Pair(BaseModel): + altname: str # alternate pair name + wsname: str # WebSocket pair name (if available) + aclass_base: str # asset class of base component + base: str # asset id of base component + aclass_quote: str # asset class of quote component + quote: str # asset id of quote component + lot: str # volume lot size + + pair_decimals: int # scaling decimal places for pair + lot_decimals: int # scaling decimal places for volume + + # amount to multiply lot volume by to get currency volume + lot_multiplier: float + + # array of leverage amounts available when buying + leverage_buy: list[int] + # array of leverage amounts available when selling + leverage_sell: list[int] + + # fee schedule array in [volume, percent fee] tuples + fees: list[tuple[int, float]] + + # maker fee schedule array in [volume, percent fee] tuples (if on + # maker/taker) + fees_maker: list[tuple[int, float]] + + fee_volume_currency: str # volume discount currency + margin_call: str # margin call level + margin_stop: str # stop-out/liquidation margin level + ordermin: float # minimum order volume for pair + + +async def stream_messages( + ws: NoBsWs, +): + ''' + Message stream parser and heartbeat handler. + + Deliver ws subscription messages as well as handle heartbeat logic + though a single async generator. + + ''' + too_slow_count = last_hb = 0 + + while True: + + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + + # trigger reconnection if heartbeat is laggy + if cs.cancelled_caught: + + too_slow_count += 1 + + if too_slow_count > 20: + log.warning( + "Heartbeat is too slow, resetting ws connection") + + await ws._connect() + too_slow_count = 0 + continue + + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + + now = time.time() + delay = now - last_hb + last_hb = now + + # XXX: why tf is this not printing without --tl flag? + log.debug(f"Heartbeat after {delay}") + # print(f"Heartbeat after {delay}") + + continue + + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + yield msg + + +async def process_data_feed_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + async for msg in stream_messages(ws): + + chan_id, *payload_array, chan_name, pair = msg + + if 'ohlc' in chan_name: + + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + + elif 'spread' in chan_name: + + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') + yield msg + + +def normalize( + ohlc: OHLC, + +) -> dict: + quote = asdict(ohlc) + quote['broker_ts'] = quote['time'] + quote['brokerd_ts'] = time.time() + quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') + quote['last'] = quote['close'] + quote['bar_wap'] = ohlc.vwap + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + # XXX: piker style is always lowercases symbols. + topic = quote['pair'].replace('/', '').lower() + + # print(quote) + return topic, quote + + +def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + https://docs.kraken.com/websockets/#message-subscribe + + ''' + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'pair': pairs, + 'event': 'subscribe', + 'subscription': data, + } + + +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('kraken') as client: + + # lol, kraken won't send any more then the "last" + # 720 1m bars.. so we have to just ignore further + # requests of this type.. + queries: int = 0 + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + nonlocal queries + if queries > 0: + raise DataUnavailable + + count = 0 + while count <= 3: + try: + array = await client.bars( + symbol, + since=end_dt, + ) + count += 1 + queries += 1 + break + except DataThrottle: + log.warning(f'kraken OHLC throttle for {symbol}') + await trio.sleep(1) + + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 1, 'rate': 1} + + +async def backfill_bars( + + sym: str, + shm: ShmArray, # type: ignore # noqa + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Fill historical bars into shared mem / storage afap. + ''' + with trio.CancelScope() as cs: + async with open_cached_client('kraken') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # backend specific + sub_type: str = 'ohlc', + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Subscribe for ohlc stream of quotes for ``pairs``. + + ``pairs`` must be formatted /. + + ''' + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + ws_pairs = {} + sym_infos = {} + + async with open_cached_client('kraken') as client, send_chan as send_chan: + + # keep client cached for real-time section + for sym in symbols: + + # transform to upper since piker style is always lower + sym = sym.upper() + + si = Pair(**await client.symbol_info(sym)) # validation + syminfo = si.dict() + syminfo['price_tick_size'] = 1 / 10**si.pair_decimals + syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals + syminfo['asset_type'] = 'crypto' + sym_infos[sym] = syminfo + ws_pairs[sym] = si.wsname + + symbol = symbols[0].lower() + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + symbol: { + 'symbol_info': sym_infos[sym], + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + }, + } + + @acm + async def subscribe(ws: wsproto.WSConnection): + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + ohlc_sub = make_sub( + list(ws_pairs.values()), + {'name': 'ohlc', 'interval': 1} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(ohlc_sub) + + # trade data (aka L1) + l1_sub = make_sub( + list(ws_pairs.values()), + {'name': 'spread'} # 'depth': 10} + ) + + # pull a first quote and deliver + await ws.send_msg(l1_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # see the tips on reconnection logic: + # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + ws: NoBsWs + async with open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws: + + # pull a first quote and deliver + msg_gen = process_data_feed_msgs(ws) + + # TODO: use ``anext()`` when it lands in 3.10! + typ, ohlc_last = await msg_gen.__anext__() + + topic, quote = normalize(ohlc_last) + + task_status.started((init_msgs, quote)) + + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() + + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime + + # start streaming + async for typ, ohlc in msg_gen: + + if typ == 'ohlc': + + # TODO: can get rid of all this by using + # ``trades`` subscription... + + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + + # new OHLC sample interval + if ohlc.etime > last_interval_start: + last_interval_start = ohlc.etime + tick_volume = volume + + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume + + ohlc_last = ohlc + last = ohlc.close + + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) + + topic, quote = normalize(ohlc) + + elif typ == 'l1': + quote = ohlc + topic = quote['symbol'].lower() + + await send_chan.send({topic: quote}) + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, + +) -> Client: + async with open_cached_client('kraken') as client: + + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started(cache) + + async with ctx.open_stream() as stream: + + async for pattern in stream: + + matches = fuzzy.extractBests( + pattern, + cache, + score_cutoff=50, + ) + # repack in dict form + await stream.send( + {item[0]['altname']: item[0] + for item in matches} + ) From 9106d13dfe44b28048fedd094b6fb1420654d347 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 17:21:45 -0400 Subject: [PATCH 06/18] Drop wacky if block logic, while loop, handle errors and prep for async batching --- piker/brokers/kraken/api.py | 67 +++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 2435b235..7fb606dd 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -21,6 +21,7 @@ Kraken web API wrapping. from contextlib import asynccontextmanager as acm from dataclasses import field from datetime import datetime +import itertools from typing import ( Any, Optional, @@ -28,7 +29,8 @@ from typing import ( ) import time -import trio +# import trio +# import tractor import pendulum import asks from fuzzywuzzy import process as fuzzy @@ -213,37 +215,46 @@ class Client: async def get_trades( self, - data: dict[str, Any] = {} + ) -> dict[str, Any]: - data['ofs'] = 0 - # Grab all trade history - # https://docs.kraken.com/rest/#operation/getTradeHistory - # Kraken uses 'ofs' to refer to the offset - while True: - resp = await self.endpoint('TradesHistory', data) - # grab the first 50 trades - if data['ofs'] == 0: - trades = resp['result']['trades'] - # load the next 50 trades using dict constructor - # for speed - elif data['ofs'] == 50: - trades = dict(trades, **resp['result']['trades']) - # catch the end of the trades - elif resp['result']['trades'] == {}: + ''' + Get the trades (aka cleared orders) history from the rest endpoint: + https://docs.kraken.com/rest/#operation/getTradeHistory + + ''' + ofs = 0 + trades_by_id: dict[str, Any] = {} + + for i in itertools.count(): + + # increment 'ofs' pagination offset + ofs = i*50 + + resp = await self.endpoint( + 'TradesHistory', + {'ofs': ofs}, + ) + # get up to 50 results + try: + by_id = resp['result']['trades'] + except KeyError: + err = resp['error'] + raise BrokerError(err) + + trades_by_id.update(by_id) + + if ( + len(by_id) < 50 + ): + # we know we received the max amount of + # trade results so there may be more history. + # catch the end of the trades count = resp['result']['count'] break - # update existing dict if num trades exceeds 100 - else: - trades.update(resp['result']['trades']) - # increment the offset counter - data['ofs'] += 50 - # To avoid exceeding API rate limit in case of a lot of trades - await trio.sleep(1) - # make sure you grabbed all the trades - assert count == len(trades.values()) - - return trades + # santity check on update + assert count == len(trades_by_id.values()) + return trades_by_id async def submit_limit( self, From fcd7e0f3f3a17d20d824de5cf91adc4ec4189a0d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 17:24:38 -0400 Subject: [PATCH 07/18] Avoid crash on trades ledger msgs Just ignore them for now using new `match:` syntax B) but we'll do incremental update sooon! Resolves #311 --- piker/brokers/kraken/broker.py | 107 +++++++++++++++++---------------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c17d9daa..9c98bebe 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -70,6 +70,7 @@ class Trade(BaseModel): def pack_positions( acc: str, trades: dict + ) -> list[Any]: positions: dict[str, float] = {} vols: dict[str, float] = {} @@ -104,8 +105,8 @@ def pack_positions( async def handle_order_requests( - client: Client, - ems_order_stream: tractor.MsgStream, + client: Client, + ems_order_stream: tractor.MsgStream, ) -> None: @@ -342,11 +343,13 @@ async def trades_dialogue( # TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + # pull and deliver trades ledger acc_name = 'kraken.' + client._name trades = await client.get_trades() - + log.info( + f'Loaded {len(trades)} trades from account `{acc_name}`' + ) position_msgs = pack_positions(acc_name, trades) - await ctx.started((position_msgs, (acc_name,))) # Get websocket token for authenticated data stream @@ -355,74 +358,76 @@ async def trades_dialogue( # lol wtf is this.. assert resp['error'] == [] - token = resp['result']['token'] async with ( ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - # Process trades msg stream of ws - async with open_autorecon_ws( + open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade + ) as ws, + trio.open_nursery() as n, + ): + # task for processing inbound requests from ems + n.start_soon(handle_order_requests, client, ems_stream) + # begin trade event processing + async for msg in process_trade_msgs(ws): + for trade in msg: + match trade: # prepare and send a filled status update - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + case Trade(): + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), - account='kraken.spot', - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, + account=acc_name, + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) - await ems_stream.send(filled_msg.dict()) + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) + await ems_stream.send(fill_msg.dict()) - await ems_stream.send(fill_msg.dict()) + case _: + log.warning(f'Unhandled trades msg: {trade}') + await tractor.breakpoint() async def process_trade_msgs( ws: NoBsWs, ): ''' - Parse and pack data feed messages. + Parse and pack trades subscription messages, deliver framed + sequences of messages? ''' sequence_counter = 0 From 735fbc625906391caccaa9ae8df75e9662cc15b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Jul 2022 11:02:02 -0400 Subject: [PATCH 08/18] Raise any error from response --- piker/brokers/kraken/api.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 7fb606dd..8d79873c 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -208,6 +208,7 @@ class Client: self, method: str, data: dict[str, Any] + ) -> dict[str, Any]: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) @@ -234,18 +235,17 @@ class Client: 'TradesHistory', {'ofs': ofs}, ) - # get up to 50 results - try: - by_id = resp['result']['trades'] - except KeyError: - err = resp['error'] - raise BrokerError(err) - + by_id = resp['result']['trades'] trades_by_id.update(by_id) + # we can get up to 50 results per query if ( len(by_id) < 50 ): + err = resp.get('error') + if err: + raise BrokerError(err) + # we know we received the max amount of # trade results so there may be more history. # catch the end of the trades From 5d39b04552ed9b0c1d1a44388f627af3bf99b75b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Jul 2022 11:02:27 -0400 Subject: [PATCH 09/18] Invert normalizer branching logic, raise on edge case --- piker/brokers/kraken/api.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 8d79873c..1c4c8045 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -446,13 +446,16 @@ async def get_client() -> Client: def normalize_symbol( ticker: str ) -> str: - # This is to convert symbol names from what kraken - # uses to the traditional 3x3 pair symbol syntax + ''' + Normalize symbol names to to a 3x3 pair. + + ''' symlen = len(ticker) - if symlen == 6: - return ticker.lower() - else: + if symlen != 6: for sym in ['XXBT', 'XXMR', 'ZEUR']: if sym in ticker: ticker = ticker.replace(sym, sym[1:]) - return ticker.lower() + else: + raise ValueError(f'Unhandled symbol: {ticker}') + + return ticker.lower() From f65f56ec75d8ddfbeb4898dccd4636b95fe27682 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Jul 2022 15:40:59 -0400 Subject: [PATCH 10/18] Initial `piker.pp` ledger support for `kraken` No real-time update support (yet) but this is the first draft at writing trades ledgers and `pps.toml` entries for the kraken backend. Deatz: - drop `pack_positions()`, no longer used. - use `piker.pp` apis to both write a trades ledger file and update the `pps.toml` inside the `trades_dialogue()` endpoint startup. - drop the weird paper engine swap over if auth can't be done, we should be doing something with messaging in the ems over this.. - more web API error response raising. - pass the `pp.Transaction` set loaded from ledger into `process_trade_msgs()` do avoid duplicate sends of already collected trades msgs. - add `norm_trade_records()` public endpoing (used by `piker.pp` api) and `update_ledger()` helper. - rejig `process_trade_msgs()` to drop the weird `try:` assertion block and skip already-recorded-in-ledger trade msgs as well as yield *each* trade instead of sub-sequences. --- piker/brokers/kraken/__init__.py | 4 +- piker/brokers/kraken/broker.py | 304 +++++++++++++++++-------------- 2 files changed, 170 insertions(+), 138 deletions(-) diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 47128f52..013da8fd 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -35,9 +35,7 @@ from .feed import ( ) from .broker import ( trades_dialogue, - - # TODO: part of pps/ledger work - # norm_trade_records, + norm_trade_records, ) __all__ = [ diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 9c98bebe..18c95d58 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -29,12 +29,13 @@ from typing import ( # Union, ) +import pendulum from pydantic import BaseModel import trio import tractor import wsproto -from piker.clearing._paper_engine import PaperBoi +from piker import pp from piker.clearing._messages import ( BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdOrderAck, BrokerdError, BrokerdCancel, @@ -62,47 +63,11 @@ class Trade(BaseModel): ''' reqid: str # kraken order transaction id action: str # buy or sell - price: str # price of asset - size: str # vol of asset + price: float # price of asset + size: float # vol of asset broker_time: str # e.g GTC, GTD -def pack_positions( - acc: str, - trades: dict - -) -> list[Any]: - positions: dict[str, float] = {} - vols: dict[str, float] = {} - costs: dict[str, float] = {} - position_msgs: list[Any] = [] - - for trade in trades.values(): - sign = -1 if trade['type'] == 'sell' else 1 - pair = trade['pair'] - vol = float(trade['vol']) - vols[pair] = vols.get(pair, 0) + sign * vol - costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) - positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 - - for ticker, pos in positions.items(): - vol = float(vols[ticker]) - if not vol: - continue - norm_sym = normalize_symbol(ticker) - msg = BrokerdPosition( - broker='kraken', - account=acc, - symbol=norm_sym, - currency=norm_sym[-3:], - size=vol, - avg_price=float(pos), - ) - position_msgs.append(msg.dict()) - - return position_msgs - - async def handle_order_requests( client: Client, @@ -317,47 +282,60 @@ async def trades_dialogue( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # Authenticated block async with get_client() as client: + + # TODO: make ems flip to paper mode via + # some returned signal if the user only wants to use + # the data feed or we return this? + # await ctx.started(({}, ['paper'])) + if not client._api_key: - log.error('Missing Kraken API key: Trades WS connection failed') - await ctx.started(({}, ['paper'])) + raise RuntimeError( + 'Missing Kraken API key in `brokers.toml`!?!?') - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - - client = PaperBoi( - 'kraken', - ems_stream, - _buys={}, - _sells={}, - - _reqids={}, - - # TODO: load paper positions from ``positions.toml`` - _positions={}, - ) - - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) + # auth required block + acctid = client._name + acc_name = 'kraken.' + acctid # pull and deliver trades ledger - acc_name = 'kraken.' + client._name trades = await client.get_trades() log.info( f'Loaded {len(trades)} trades from account `{acc_name}`' ) - position_msgs = pack_positions(acc_name, trades) - await ctx.started((position_msgs, (acc_name,))) + trans = await update_ledger(acctid, trades) + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys(t.bsuid for t in trans), + ) + + position_msgs: list[dict] = [] + pps: dict[int, pp.Position] + for pps in [active, closed]: + for tid, p in pps.items(): + msg = BrokerdPosition( + broker='kraken', + account=acc_name, + symbol=p.symbol.front_fqsn(), + size=p.size, + avg_price=p.be_price, + currency='', + ) + position_msgs.append(msg.dict()) + + await ctx.started( + (position_msgs, [acc_name]) + ) # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) - # lol wtf is this.. - assert resp['error'] == [] + err = resp.get('error') + if err: + raise BrokerError(err) + token = resp['result']['token'] async with ( @@ -373,93 +351,149 @@ async def trades_dialogue( n.start_soon(handle_order_requests, client, ems_stream) # begin trade event processing - async for msg in process_trade_msgs(ws): - for trade in msg: - match trade: - # prepare and send a filled status update - case Trade(): - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + async for trade in process_trade_msgs( + ws, + trans, # pass in prior ledger transactions + ): + match trade: + # prepare and send a filled status update + case Trade(): + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), - account=acc_name, - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, + account=acc_name, + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) - await ems_stream.send(fill_msg.dict()) + await ems_stream.send(fill_msg.dict()) - case _: - log.warning(f'Unhandled trades msg: {trade}') - await tractor.breakpoint() + case _: + log.warning(f'Unhandled trades msg: {trade}') + await tractor.breakpoint() + + +def norm_trade_records( + ledger: dict[str, Any], + +) -> list[pp.Transaction]: + + records: list[pp.Transaction] = [] + + for tid, record in ledger.items(): + + size = record.get('vol') * { + 'buy': 1, + 'sell': -1, + }[record['type']] + bsuid = record['pair'] + norm_sym = normalize_symbol(bsuid) + + records.append( + pp.Transaction( + fqsn=f'{norm_sym}.kraken', + tid=tid, + size=float(size), + price=float(record['price']), + cost=float(record['fee']), + dt=pendulum.from_timestamp(record['time']), + bsuid=bsuid, + + # XXX: there are no derivs on kraken right? + # expiry=expiry, + ) + ) + + return records + + +async def update_ledger( + acctid: str, + trade_entries: list[dict[str, Any]], + +) -> list[pp.Transaction]: + + # write recent session's trades to the user's (local) ledger file. + with pp.open_trade_ledger( + 'kraken', + acctid, + ) as ledger: + ledger.update(trade_entries) + + # normalize to transaction form + records = norm_trade_records(trade_entries) + return records async def process_trade_msgs( ws: NoBsWs, + trans: list[pp.Transaction], ): ''' Parse and pack trades subscription messages, deliver framed sequences of messages? + Ws api docs: + https://docs.kraken.com/websockets/#message-ownTrades + ''' - sequence_counter = 0 + count: int = 0 + ledger_txids = {r.tid for r in trans} + async for msg in stream_messages(ws): - try: - # check that we are on the ownTrades stream and that msgs - # are arriving in sequence with kraken For clarification the - # kraken ws api docs for this stream: - # https://docs.kraken.com/websockets/#message-ownTrades - assert msg[1] == 'ownTrades' - assert msg[2]['sequence'] > sequence_counter - sequence_counter += 1 - raw_msgs = msg[0] - trade_msgs = [] + sub = msg[1] + seq = msg[2]['sequence'] - # Check that we are only processing new trades - if msg[2]['sequence'] != 1: - # check if its a new order or an update msg - for trade_msg in raw_msgs: - trade = list(trade_msg.values())[0] - order_msg = Trade( - reqid=trade['ordertxid'], - action=trade['type'], - price=trade['price'], - size=trade['vol'], - broker_time=trade['time'] - ) - trade_msgs.append(order_msg) + # stream sanity checks + assert sub == 'ownTrades' - yield trade_msgs + # ensure that we are only processing new trades + assert seq > count + count += 1 - except AssertionError: - print(f'UNHANDLED MSG: {msg}') - yield msg + trade_events = msg[0] + + for trade_event in trade_events: + for tid, trade_data in trade_event.items(): + if tid in ledger_txids: + continue + + trade = Trade( + reqid=msg['ordertxid'], + action=msg['type'], + price=msg['price'], + size=msg['vol'], + broker_time=msg['time'] + ) + + yield trade From f6888057c30916c96ca58524866564e52e646f49 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Jul 2022 15:48:42 -0400 Subject: [PATCH 11/18] Just do a naive lookup for symbol normalization --- piker/brokers/kraken/api.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 1c4c8045..68bdde6e 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -450,12 +450,14 @@ def normalize_symbol( Normalize symbol names to to a 3x3 pair. ''' + remap = { + 'XXBTZEUR': 'XBTEUR', + 'XXMRZEUR': 'XMREUR', + } symlen = len(ticker) if symlen != 6: - for sym in ['XXBT', 'XXMR', 'ZEUR']: - if sym in ticker: - ticker = ticker.replace(sym, sym[1:]) - else: - raise ValueError(f'Unhandled symbol: {ticker}') + ticker = remap[ticker] + else: + raise ValueError(f'Unhandled symbol: {ticker}') return ticker.lower() From 47777e4192d28769c194954c5e0e89c491074e55 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Jul 2022 15:49:32 -0400 Subject: [PATCH 12/18] Use new `str.removeprefix()` from py3.10 --- piker/pp.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 0769982b..f7513154 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -615,7 +615,7 @@ def load_pps_from_toml( if not pps: log.warning( - f'No trade history could be loaded for {brokername}:{acctid}' + f'No `pps.toml` positions cold be loaded for {brokername}:{acctid}' ) # unmarshal/load ``pps.toml`` config entries into object form. @@ -752,11 +752,11 @@ def update_pps_conf( # drop symbol obj in serialized form s = asdict.pop('symbol') fqsn = s.front_fqsn() - print(f'Updating active pp: {fqsn}') + log.info(f'Updating active pp: {fqsn}') # XXX: ugh, it's cuz we push the section under # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.rstrip(f'.{brokername}') + brokerless_key = fqsn.removeprefix(f'{brokername}.') pp_entries[brokerless_key] = asdict From aea7bec2c3012b73737aadb62689eb78ffe638f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Jul 2022 11:18:45 -0400 Subject: [PATCH 13/18] Inline `process_trade_msgs()` into relay loop --- piker/brokers/kraken/broker.py | 161 +++++++++++++++------------------ 1 file changed, 72 insertions(+), 89 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 18c95d58..63ab7fa0 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -37,9 +37,13 @@ import wsproto from piker import pp from piker.clearing._messages import ( - BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdCancel, + BrokerdError, BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, ) from .api import ( Client, @@ -338,6 +342,7 @@ async def trades_dialogue( token = resp['result']['token'] + ws: NoBsWs async with ( ctx.open_stream() as ems_stream, open_autorecon_ws( @@ -350,55 +355,78 @@ async def trades_dialogue( # task for processing inbound requests from ems n.start_soon(handle_order_requests, client, ems_stream) - # begin trade event processing - async for trade in process_trade_msgs( - ws, - trans, # pass in prior ledger transactions - ): - match trade: - # prepare and send a filled status update - case Trade(): - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + count: int = 0 + ledger_txids = {r.tid for r in trans} - account=acc_name, - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, + # process and relay trades events to ems + # https://docs.kraken.com/websockets/#message-ownTrades + async for msg in stream_messages(ws): + match msg: + case [ + trades_msgs, + 'ownTrades', + {'sequence': seq}, + ]: + # ensure that we are only processing new trades + assert seq > count + count += 1 - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + for entries in trades_msgs: + for tid, msg in entries.items(): - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + if tid in ledger_txids: + log.debug(f'Skipping ledgered {tid}:{msg}') + continue - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) + # yield trade + reqid = msg['ordertxid'] + action = msg['type'] + price = float(msg['price']) + size = float(msg['vol']) + broker_time = float(msg['time']) - await ems_stream.send(fill_msg.dict()) + filled_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), + + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), + + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + + await ems_stream.send(fill_msg.dict()) case _: - log.warning(f'Unhandled trades msg: {trade}') + log.warning(f'Unhandled trades msg: {msg}') await tractor.breakpoint() @@ -452,48 +480,3 @@ async def update_ledger( # normalize to transaction form records = norm_trade_records(trade_entries) return records - - -async def process_trade_msgs( - ws: NoBsWs, - trans: list[pp.Transaction], -): - ''' - Parse and pack trades subscription messages, deliver framed - sequences of messages? - - Ws api docs: - https://docs.kraken.com/websockets/#message-ownTrades - - ''' - count: int = 0 - ledger_txids = {r.tid for r in trans} - - async for msg in stream_messages(ws): - - sub = msg[1] - seq = msg[2]['sequence'] - - # stream sanity checks - assert sub == 'ownTrades' - - # ensure that we are only processing new trades - assert seq > count - count += 1 - - trade_events = msg[0] - - for trade_event in trade_events: - for tid, trade_data in trade_event.items(): - if tid in ledger_txids: - continue - - trade = Trade( - reqid=msg['ordertxid'], - action=msg['type'], - price=msg['price'], - size=msg['vol'], - broker_time=msg['time'] - ) - - yield trade From 4c0f2099aa197e43d23524f8eb4a4d32fab27d1e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Jul 2022 11:19:33 -0400 Subject: [PATCH 14/18] Send fill msg first --- piker/brokers/kraken/broker.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 63ab7fa0..1cf4e9fe 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -385,6 +385,21 @@ async def trades_dialogue( size = float(msg['vol']) broker_time = float(msg['time']) + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), + + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + + await ems_stream.send(fill_msg.dict()) filled_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), @@ -409,22 +424,6 @@ async def trades_dialogue( ) await ems_stream.send(filled_msg.dict()) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=reqid, - time_ns=time.time_ns(), - - action=action, - size=size, - price=price, - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'}, - broker_time=broker_time - ) - - await ems_stream.send(fill_msg.dict()) - case _: log.warning(f'Unhandled trades msg: {msg}') await tractor.breakpoint() From 214f864dcf54273b97580a223130e6898dca025d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Jul 2022 14:37:15 -0400 Subject: [PATCH 15/18] Handle ws style symbol schema --- piker/brokers/kraken/api.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 68bdde6e..fa167cf4 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -264,6 +264,7 @@ class Client: size: float, reqid: str = None, validate: bool = False # set True test call without a real submission + ) -> dict: ''' Place an order and return integer request id provided by client. @@ -278,7 +279,9 @@ class Client: if reqid is None: # Build order data for kraken api data |= { - "ordertype": "limit", "type": action, "volume": str(size) + "ordertype": "limit", + "type": action, + "volume": str(size), } return await self.endpoint('AddOrder', data) else: @@ -453,6 +456,10 @@ def normalize_symbol( remap = { 'XXBTZEUR': 'XBTEUR', 'XXMRZEUR': 'XMREUR', + + # ws versions? pretty weird.. + 'XBT/EUR': 'XBTEUR', + 'XMR/EUR': 'XMREUR', } symlen = len(ticker) if symlen != 6: From 7846446a4417612821954fc74dfbf074e646e66e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Jul 2022 14:39:33 -0400 Subject: [PATCH 16/18] Add real-time incremental pp updates Moves to using the new `piker.pp` apis to both store real-time trade events in a ledger file as well emit position update msgs (which were not in this backend at all prior) when new orders clear (aka fill). In terms of outstanding issues, - solves the pp update part of the bugs reported in #310 - starts a msg case block in prep for #293 Details of rework: - move the `subscribe()` ws fixture to module level and `partial()` in the client token instead of passing it to the instance; in prep for removal of the `.token` attr from the `NoBsWs` wrapper. - drop `make_auth_sub()` since it was too thin and we can just do it all succinctly in `subscribe()` - filter trade update msgs to those not yet stored int the toml ledger - much better kraken api msg unpacking using new `match:` synax B) Resolves #311 --- piker/brokers/kraken/broker.py | 231 +++++++++++++++++++++------------ 1 file changed, 145 insertions(+), 86 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 1cf4e9fe..433c058b 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -19,6 +19,8 @@ Order api and machinery ''' from contextlib import asynccontextmanager as acm +from functools import partial +from itertools import chain from pprint import pformat import time from typing import ( @@ -234,20 +236,49 @@ async def handle_order_requests( log.error(f'Unknown order command: {request_msg}') -def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: +@acm +async def subscribe( + ws: wsproto.WSConnection, + token: str, + subs: list[str] = ['ownTrades', 'openOrders'], +): ''' - Create a request subscription packet dict. - - ## TODO: point to the auth urls + Setup ws api subscriptions: https://docs.kraken.com/websockets/#message-subscribe + By default we sign up for trade and order update events. + ''' - # eg. specific logic for this in kraken's sync client: + # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'event': 'subscribe', - 'subscription': data, - } + + assert token + for sub in subs: + msg = { + 'event': 'subscribe', + 'subscription': { + 'name': sub, + 'token': token, + } + } + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(msg) + + yield + + for sub in subs: + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': [sub], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() @tractor.context @@ -259,33 +290,6 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - @acm - async def subscribe(ws: wsproto.WSConnection, token: str): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - trades_sub = make_auth_sub( - {'name': 'ownTrades', 'token': token} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(trades_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'event': 'unsubscribe', - 'subscription': ['ownTrades'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - async with get_client() as client: # TODO: make ems flip to paper mode via @@ -347,8 +351,10 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, open_autorecon_ws( 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, + fixture=partial( + subscribe, + token=token, + ), ) as ws, trio.open_nursery() as n, ): @@ -356,7 +362,6 @@ async def trades_dialogue( n.start_soon(handle_order_requests, client, ems_stream) count: int = 0 - ledger_txids = {r.tid for r in trans} # process and relay trades events to ems # https://docs.kraken.com/websockets/#message-ownTrades @@ -367,62 +372,116 @@ async def trades_dialogue( 'ownTrades', {'sequence': seq}, ]: - # ensure that we are only processing new trades + # XXX: do we actually need this orrr? + # ensure that we are only processing new trades? assert seq > count count += 1 - for entries in trades_msgs: - for tid, msg in entries.items(): + # flatten msgs for processing + trades = { + tid: trade + for entry in trades_msgs + for (tid, trade) in entry.items() - if tid in ledger_txids: - log.debug(f'Skipping ledgered {tid}:{msg}') - continue + # only emit entries which are already not-in-ledger + if tid not in {r.tid for r in trans} + } + for tid, trade in trades.items(): - # yield trade - reqid = msg['ordertxid'] - action = msg['type'] - price = float(msg['price']) - size = float(msg['vol']) - broker_time = float(msg['time']) + # parse-cast + reqid = trade['ordertxid'] + action = trade['type'] + price = float(trade['price']) + size = float(trade['vol']) + broker_time = float(trade['time']) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=reqid, + time_ns=time.time_ns(), - action=action, - size=size, - price=price, - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'}, - broker_time=broker_time - ) + action=action, + size=size, + price=price, + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'}, + broker_time=broker_time + ) + await ems_stream.send(fill_msg.dict()) - await ems_stream.send(fill_msg.dict()) - filled_msg = BrokerdStatus( - reqid=reqid, - time_ns=time.time_ns(), + filled_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), - account=acc_name, - status='filled', - filled=size, - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': broker_time - }, + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - await ems_stream.send(filled_msg.dict()) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) + + # update ledger and position tracking + trans = await update_ledger(acctid, trades) + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys( + t.bsuid for t in trans), + ) + + # emit pp msgs + for pos in filter( + bool, + chain(active.values(), closed.values()), + ): + pp_msg = BrokerdPosition( + broker='kraken', + + # XXX: ok so this is annoying, we're + # relaying an account name with the + # backend suffix prefixed but when + # reading accounts from ledgers we + # don't need it and/or it's prefixed + # in the section table.. we should + # just strip this from the message + # right since `.broker` is already + # included? + account=f'kraken.{acctid}', + symbol=pos.symbol.front_fqsn(), + size=pos.size, + avg_price=pos.be_price, + + # TODO + # currency='' + ) + await ems_stream.send(pp_msg.dict()) + + case [ + trades_msgs, + 'openOrders', + {'sequence': seq}, + ]: + # TODO: async order update handling which we + # should remove from `handle_order_requests()` + # above: + # https://github.com/pikers/piker/issues/293 + # https://github.com/pikers/piker/issues/310 + log.info(f'Order update {seq}:{trades_msgs}') case _: log.warning(f'Unhandled trades msg: {msg}') @@ -452,7 +511,7 @@ def norm_trade_records( size=float(size), price=float(record['price']), cost=float(record['fee']), - dt=pendulum.from_timestamp(record['time']), + dt=pendulum.from_timestamp(float(record['time'])), bsuid=bsuid, # XXX: there are no derivs on kraken right? From 609034c634c5434b7079cc67907130ce1d862c6e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 16:46:31 -0400 Subject: [PATCH 17/18] Fix typo / line length --- piker/pp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/pp.py b/piker/pp.py index f7513154..88a7147d 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -615,7 +615,7 @@ def load_pps_from_toml( if not pps: log.warning( - f'No `pps.toml` positions cold be loaded for {brokername}:{acctid}' + f'No `pps.toml` positions could be loaded {brokername}:{acctid}' ) # unmarshal/load ``pps.toml`` config entries into object form. From af01e8961249b5b8cb26fef9c54ffc3c067db534 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 16:59:47 -0400 Subject: [PATCH 18/18] Create sub-pkg logger once during import --- piker/brokers/kraken/__init__.py | 6 ++++++ piker/brokers/kraken/api.py | 4 +--- piker/brokers/kraken/broker.py | 2 +- piker/brokers/kraken/feed.py | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 013da8fd..cd36f4e5 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -25,6 +25,11 @@ Sub-modules within break into the core functionalities: wrapping around ``ib_insync``. ''' + +from piker.log import get_logger + +log = get_logger(__name__) + from .api import ( get_client, ) @@ -44,6 +49,7 @@ __all__ = [ 'open_history_client', 'open_symbol_search', 'stream_quotes', + 'norm_trade_records', ] diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index fa167cf4..3abf533e 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -48,9 +48,7 @@ from piker.brokers._util import ( BrokerError, DataThrottle, ) -from piker.log import get_logger - -log = get_logger(__name__) +from . import log # // _url = 'https://api.kraken.com/0' diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 433c058b..4e2e02f6 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -47,11 +47,11 @@ from piker.clearing._messages import ( BrokerdPosition, BrokerdStatus, ) +from . import log from .api import ( Client, BrokerError, get_client, - log, normalize_symbol, ) from .feed import ( diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 5742bcb1..71b75082 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -46,9 +46,9 @@ from piker.brokers._util import ( from piker.log import get_console_log from piker.data import ShmArray from piker.data._web_bs import open_autorecon_ws, NoBsWs +from . import log from .api import ( Client, - log, OHLC, )