From 90cc6eb31730f4abba44500a4f4ee9395dd47669 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 11:08:48 -0400 Subject: [PATCH] Factor clearing related endpoints into new `.kraken.broker` submod --- piker/brokers/kraken/__init__.py | 430 +---------------------------- piker/brokers/kraken/broker.py | 457 +++++++++++++++++++++++++++++++ 2 files changed, 465 insertions(+), 422 deletions(-) create mode 100644 piker/brokers/kraken/broker.py diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 670eed6f..56c6ec85 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -21,8 +21,7 @@ Kraken backend. from contextlib import asynccontextmanager as acm from dataclasses import asdict, field from datetime import datetime -from pprint import pformat -from typing import Any, Optional, AsyncIterator, Callable, Union +from typing import Any, Optional, Callable, Union import time from trio_typing import TaskStatus @@ -40,29 +39,21 @@ import hashlib import hmac import base64 -from .. import config -from .._cacheables import open_cached_client -from ._util import ( +from piker import config +from piker._cacheables import open_cached_client +from piker.brokers._util import ( resproc, SymbolNotFound, BrokerError, DataThrottle, DataUnavailable, ) -from ..log import get_logger, get_console_log -from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws, NoBsWs -from ..clearing._paper_engine import PaperBoi -from ..clearing._messages import ( - BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, - BrokerdFill, -) - +from piker.log import get_logger, get_console_log +from piker.data import ShmArray +from piker.data._web_bs import open_autorecon_ws, NoBsWs log = get_logger(__name__) - # // _url = 'https://api.kraken.com/0' @@ -126,18 +117,6 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair -class Trade(BaseModel): - ''' - Trade class that helps parse and validate ownTrades stream - - ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: str # price of asset - size: str # vol of asset - broker_time: str # e.g GTC, GTD - - @dataclass class OHLC: ''' @@ -493,41 +472,6 @@ async def get_client() -> Client: yield client -def pack_positions( - acc: str, - trades: dict -) -> list[Any]: - positions: dict[str, float] = {} - vols: dict[str, float] = {} - costs: dict[str, float] = {} - position_msgs: list[Any] = [] - - for trade in trades.values(): - sign = -1 if trade['type'] == 'sell' else 1 - pair = trade['pair'] - vol = float(trade['vol']) - vols[pair] = vols.get(pair, 0) + sign * vol - costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) - positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 - - for ticker, pos in positions.items(): - vol = float(vols[ticker]) - if not vol: - continue - norm_sym = normalize_symbol(ticker) - msg = BrokerdPosition( - broker='kraken', - account=acc, - symbol=norm_sym, - currency=norm_sym[-3:], - size=vol, - avg_price=float(pos), - ) - position_msgs.append(msg.dict()) - - return position_msgs - - def normalize_symbol( ticker: str ) -> str: @@ -543,322 +487,6 @@ def normalize_symbol( return ticker.lower() -def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - ## TODO: point to the auth urls - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'event': 'subscribe', - 'subscription': data, - } - - -async def handle_order_requests( - - client: Client, - ems_order_stream: tractor.MsgStream, - -) -> None: - - request_msg: dict - order: BrokerdOrder - - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) - - action = request_msg['action'] - - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - - # reason=f'Kraken only, No account found: `{account}` ?', - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - - ).dict()) - continue - - # validate - order = BrokerdOrder(**request_msg) - # call our client api to submit the order - resp = await client.submit_limit( - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - reqid=order.reqid, - ) - - err = resp['error'] - if err: - oid = order.oid - log.error(f'Failed to submit order: {oid}') - - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=order.reqid, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ).dict() - ) - else: - # TODO: handle multiple orders (cancels?) - # txid is an array of strings - if order.reqid is None: - reqid = resp['result']['txid'][0] - else: - # update the internal pairing of oid to krakens - # txid with the new txid that is returned on edit - reqid = resp['result']['txid'] - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - # account the made the order - account=order.account - - ).dict() - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) - - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled. - try: - result = resp['result'] - count = result['count'] - - # check for 'error' key if we received no 'result' - except KeyError: - error = resp.get('error') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason="Failed order cancel", - broker_details=resp - ).dict() - ) - - if not error: - raise BrokerError(f'Unknown order cancel response: {resp}') - - else: - if not count: # no orders were cancelled? - - # XXX: what exactly is this from and why would we care? - # there doesn't seem to be any docs here? - # https://docs.kraken.com/rest/#operation/cancelOrder - - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - pending = result.get('pending') - if pending: - log.error(f'Order {oid} cancel was not yet successful') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - # TODO: maybe figure out if pending - # cancels will eventually get cancelled - reason="Order cancel is still pending?", - broker_details=resp - ).dict() - ) - - else: # order cancel success case. - - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - else: - log.error(f'Unknown order command: {request_msg}') - - -@tractor.context -async def trades_dialogue( - ctx: tractor.Context, - loglevel: str = None, -) -> AsyncIterator[dict[str, Any]]: - - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - @acm - async def subscribe(ws: wsproto.WSConnection, token: str): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - trades_sub = make_auth_sub( - {'name': 'ownTrades', 'token': token} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(trades_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'event': 'unsubscribe', - 'subscription': ['ownTrades'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - # Authenticated block - async with get_client() as client: - if not client._api_key: - log.error('Missing Kraken API key: Trades WS connection failed') - await ctx.started(({}, ['paper'])) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - - client = PaperBoi( - 'kraken', - ems_stream, - _buys={}, - _sells={}, - - _reqids={}, - - # TODO: load paper positions from ``positions.toml`` - _positions={}, - ) - - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - acc_name = 'kraken.' + client._name - trades = await client.get_trades() - - position_msgs = pack_positions(acc_name, trades) - - await ctx.started((position_msgs, (acc_name,))) - - # Get websocket token for authenticated data stream - # Assert that a token was actually received. - resp = await client.endpoint('GetWebSocketsToken', {}) - - # lol wtf is this.. - assert resp['error'] == [] - - token = resp['result']['token'] - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - # Process trades msg stream of ws - async with open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade - - # prepare and send a filled status update - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), - - account='kraken.spot', - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, - - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) - - await ems_stream.send(filled_msg.dict()) - - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), - - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) - - await ems_stream.send(fill_msg.dict()) - - async def stream_messages( ws: NoBsWs, ): @@ -950,48 +578,6 @@ async def process_data_feed_msgs( yield msg -async def process_trade_msgs( - ws: NoBsWs, -): - ''' - Parse and pack data feed messages. - - ''' - sequence_counter = 0 - async for msg in stream_messages(ws): - - try: - # check that we are on the ownTrades stream and that msgs - # are arriving in sequence with kraken For clarification the - # kraken ws api docs for this stream: - # https://docs.kraken.com/websockets/#message-ownTrades - assert msg[1] == 'ownTrades' - assert msg[2]['sequence'] > sequence_counter - sequence_counter += 1 - raw_msgs = msg[0] - trade_msgs = [] - - # Check that we are only processing new trades - if msg[2]['sequence'] != 1: - # check if its a new order or an update msg - for trade_msg in raw_msgs: - trade = list(trade_msg.values())[0] - order_msg = Trade( - reqid=trade['ordertxid'], - action=trade['type'], - price=trade['price'], - size=trade['vol'], - broker_time=trade['time'] - ) - trade_msgs.append(order_msg) - - yield trade_msgs - - except AssertionError: - print(f'UNHANDLED MSG: {msg}') - yield msg - - def normalize( ohlc: OHLC, diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py new file mode 100644 index 00000000..4d1f0338 --- /dev/null +++ b/piker/brokers/kraken/broker.py @@ -0,0 +1,457 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Order api and machinery + +''' +from contextlib import asynccontextmanager as acm +from pprint import pformat +import time +from typing import ( + Any, + AsyncIterator, + # Callable, + # Optional, + # Union, +) + +from pydantic import BaseModel +import trio +import tractor +import wsproto + +from . import ( + Client, + BrokerError, + get_client, + get_console_log, + log, + normalize_symbol, + open_autorecon_ws, + NoBsWs, + stream_messages, +) +from ..clearing._paper_engine import PaperBoi +from ..clearing._messages import ( + BrokerdPosition, BrokerdOrder, BrokerdStatus, + BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdFill, +) + + +class Trade(BaseModel): + ''' + Trade class that helps parse and validate ownTrades stream + + ''' + reqid: str # kraken order transaction id + action: str # buy or sell + price: str # price of asset + size: str # vol of asset + broker_time: str # e.g GTC, GTD + + +def pack_positions( + acc: str, + trades: dict +) -> list[Any]: + positions: dict[str, float] = {} + vols: dict[str, float] = {} + costs: dict[str, float] = {} + position_msgs: list[Any] = [] + + for trade in trades.values(): + sign = -1 if trade['type'] == 'sell' else 1 + pair = trade['pair'] + vol = float(trade['vol']) + vols[pair] = vols.get(pair, 0) + sign * vol + costs[pair] = costs.get(pair, 0) + sign * float(trade['cost']) + positions[pair] = costs[pair] / vols[pair] if vols[pair] else 0 + + for ticker, pos in positions.items(): + vol = float(vols[ticker]) + if not vol: + continue + norm_sym = normalize_symbol(ticker) + msg = BrokerdPosition( + broker='kraken', + account=acc, + symbol=norm_sym, + currency=norm_sym[-3:], + size=vol, + avg_price=float(pos), + ) + position_msgs.append(msg.dict()) + + return position_msgs + + +async def handle_order_requests( + + client: Client, + ems_order_stream: tractor.MsgStream, + +) -> None: + + request_msg: dict + order: BrokerdOrder + + async for request_msg in ems_order_stream: + log.info( + 'Received order request:\n' + f'{pformat(request_msg)}' + ) + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + + # reason=f'Kraken only, No account found: `{account}` ?', + reason=( + 'Kraken only, order mode disabled due to ' + 'https://github.com/pikers/piker/issues/299' + ), + + ).dict()) + continue + + # validate + order = BrokerdOrder(**request_msg) + # call our client api to submit the order + resp = await client.submit_limit( + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + reqid=order.reqid, + ) + + err = resp['error'] + if err: + oid = order.oid + log.error(f'Failed to submit order: {oid}') + + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=order.reqid, + symbol=order.symbol, + reason="Failed order submission", + broker_details=resp + ).dict() + ) + else: + # TODO: handle multiple orders (cancels?) + # txid is an array of strings + if order.reqid is None: + reqid = resp['result']['txid'][0] + else: + # update the internal pairing of oid to krakens + # txid with the new txid that is returned on edit + reqid = resp['result']['txid'] + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + # account the made the order + account=order.account + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + # Send order cancellation to kraken + resp = await client.submit_cancel( + reqid=msg.reqid + ) + + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled. + try: + result = resp['result'] + count = result['count'] + + # check for 'error' key if we received no 'result' + except KeyError: + error = resp.get('error') + + await ems_order_stream.send( + BrokerdError( + oid=msg.oid, + reqid=msg.reqid, + symbol=msg.symbol, + reason="Failed order cancel", + broker_details=resp + ).dict() + ) + + if not error: + raise BrokerError(f'Unknown order cancel response: {resp}') + + else: + if not count: # no orders were cancelled? + + # XXX: what exactly is this from and why would we care? + # there doesn't seem to be any docs here? + # https://docs.kraken.com/rest/#operation/cancelOrder + + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + pending = result.get('pending') + if pending: + log.error(f'Order {oid} cancel was not yet successful') + + await ems_order_stream.send( + BrokerdError( + oid=msg.oid, + reqid=msg.reqid, + symbol=msg.symbol, + # TODO: maybe figure out if pending + # cancels will eventually get cancelled + reason="Order cancel is still pending?", + broker_details=resp + ).dict() + ) + + else: # order cancel success case. + + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + else: + log.error(f'Unknown order command: {request_msg}') + + +def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + ## TODO: point to the auth urls + https://docs.kraken.com/websockets/#message-subscribe + + ''' + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'event': 'subscribe', + 'subscription': data, + } + + +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None, +) -> AsyncIterator[dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + @acm + async def subscribe(ws: wsproto.WSConnection, token: str): + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + trades_sub = make_auth_sub( + {'name': 'ownTrades', 'token': token} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(trades_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': ['ownTrades'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # Authenticated block + async with get_client() as client: + if not client._api_key: + log.error('Missing Kraken API key: Trades WS connection failed') + await ctx.started(({}, ['paper'])) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + + client = PaperBoi( + 'kraken', + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + + # TODO: load paper positions from ``positions.toml`` + _positions={}, + ) + + # TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + acc_name = 'kraken.' + client._name + trades = await client.get_trades() + + position_msgs = pack_positions(acc_name, trades) + + await ctx.started((position_msgs, (acc_name,))) + + # Get websocket token for authenticated data stream + # Assert that a token was actually received. + resp = await client.endpoint('GetWebSocketsToken', {}) + + # lol wtf is this.. + assert resp['error'] == [] + + token = resp['result']['token'] + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + + # prepare and send a filled status update + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + + await ems_stream.send(filled_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + + +async def process_trade_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + sequence_counter = 0 + async for msg in stream_messages(ws): + + try: + # check that we are on the ownTrades stream and that msgs + # are arriving in sequence with kraken For clarification the + # kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades + assert msg[1] == 'ownTrades' + assert msg[2]['sequence'] > sequence_counter + sequence_counter += 1 + raw_msgs = msg[0] + trade_msgs = [] + + # Check that we are only processing new trades + if msg[2]['sequence'] != 1: + # check if its a new order or an update msg + for trade_msg in raw_msgs: + trade = list(trade_msg.values())[0] + order_msg = Trade( + reqid=trade['ordertxid'], + action=trade['type'], + price=trade['price'], + size=trade['vol'], + broker_time=trade['time'] + ) + trade_msgs.append(order_msg) + + yield trade_msgs + + except AssertionError: + print(f'UNHANDLED MSG: {msg}') + yield msg