Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 30fdf550d7 Factor out ws msg hearbeat and error handling
Move the core ws message handling into `stream_messages()` and call that
from 2 new stream processors: `process_data_feed_msgs()` and
`process_order_msgs()`. Add comments for hints on how to implement the
order msg parsing as well as `pprint` received msgs to console for now.
2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 37df05c260 connect to krakens openOrders websocket 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos d141981cca order submission and cancellation working 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos ead9fbd2f1 basic order submission and cancelling with kraken 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos bcdcfc7d59 valdiate and ack order requests from ems 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 3af3b693da mock orders validated from kraken 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 7627fb5141 get basic order request loop receiving msgs 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos a3a6718ba5 added the bones for the handle_order_requests func 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 90947438d2 save progress on kraken to test out unit_select_fixes 2022-02-17 08:21:00 -05:00
Konstantine Tsafatinos 5b01c84afd get positions working for kraken 2022-02-17 08:21:00 -05:00
5 changed files with 404 additions and 66 deletions

View File

@ -8,8 +8,8 @@ expires_at = 1616095326.355846
[kraken] [kraken]
key_descr = "api_0" key_descr = "api_0"
public_key = "" api_key = ""
private_key = "" secret = ""
[ib] [ib]
host = "127.0.0.1" host = "127.0.0.1"

View File

@ -33,20 +33,24 @@ import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
from itertools import count
from .. import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..clearing._messages import BrokerdPosition from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel,
BrokerdFill,
)
import urllib.parse import urllib.parse
import hashlib import hashlib
import hmac import hmac
import base64 import base64
import pandas as pd
log = get_logger(__name__) log = get_logger(__name__)
@ -138,6 +142,19 @@ class OHLC:
ticks: List[Any] = field(default_factory=list) ticks: List[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section
def get_kraken_signature( def get_kraken_signature(
urlpath: str, urlpath: str,
data: Dict[str, Any], data: Dict[str, Any],
@ -170,6 +187,7 @@ class Client:
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}) })
self._pairs: list[str] = [] self._pairs: list[str] = []
self._name = ''
self._api_key = '' self._api_key = ''
self._secret = '' self._secret = ''
@ -217,7 +235,7 @@ class Client:
) )
return resproc(resp, log) return resproc(resp, log)
async def get_user_data( async def kraken_endpoint(
self, self,
method: str, method: str,
data: Dict[str, Any] data: Dict[str, Any]
@ -225,18 +243,17 @@ class Client:
uri_path = f'/0/private/{method}' uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time())) data['nonce'] = str(int(1000*time.time()))
resp = await self._private(method, data, uri_path) resp = await self._private(method, data, uri_path)
err = resp['error'] return resp
if err:
print(err)
return resp['result']
async def get_positions( async def get_positions(
self, self,
data: Dict[str, Any] = {} data: Dict[str, Any] = {}
) -> Dict[str, Any]: ) -> Dict[str, Any]:
balances = await self.get_user_data('Balance', data) resp = await self.kraken_endpoint('Balance', data)
balances = resp['result']
## TODO: grab all entries, not just first 50 ## TODO: grab all entries, not just first 50
traders = await self.get_user_data('TradesHistory', data) resp = await self.kraken_endpoint('TradesHistory', data)
traders = resp['result']
positions = {} positions = {}
vols = {} vols = {}
@ -258,7 +275,46 @@ class Client:
else: else:
positions[pair] /= asset_balance positions[pair] /= asset_balance
return positions return positions, vols
async def submit_limit(
self,
oid: str,
symbol: str,
price: float,
action: str,
size: float,
# account: str,
reqid: int = None,
) -> int:
"""Place an order and return integer request id provided by client.
"""
# Build order data for kraken api
data = {
"userref": reqid,
"ordertype": "limit",
"type": action,
"volume": str(size),
"pair": symbol,
"price": str(price),
# set to True test AddOrder call without a real submission
"validate": False
}
resp = await self.kraken_endpoint('AddOrder', data)
return resp
async def submit_cancel(
self,
reqid: str,
) -> None:
"""Send cancel request for order id ``reqid``.
"""
# txid is a transaction id given by kraken
data = {"txid": reqid}
resp = await self.kraken_endpoint('CancelOrder', data)
return resp
async def symbol_info( async def symbol_info(
self, self,
@ -373,16 +429,15 @@ class Client:
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
conf, path = config.load() ## TODO: maybe add conditional based on section
section = conf.get('kraken') section = get_config()
client._name = section['key_descr']
client._api_key = section['api_key'] client._api_key = section['api_key']
client._secret = section['secret'] client._secret = section['secret']
data = { ## TODO: Add a client attribute to hold this info
# add non-nonce and non-ofs vars #data = {
} # # add non-nonce and non-ofs vars
# positions = await client.get_positions(data) #}
# await tractor.breakpoint()
# at startup, load all symbols locally for fast search # at startup, load all symbols locally for fast search
await client.cache_symbols() await client.cache_symbols()
@ -390,6 +445,166 @@ async def get_client() -> Client:
yield client yield client
def pack_position(
acc: str,
symkey: str,
pos: float,
vol: float
) -> dict[str, Any]:
return BrokerdPosition(
broker='kraken',
account=acc,
symbol=symkey,
currency=symkey[-3:],
size=float(vol),
avg_price=float(pos),
)
def normalize_symbol(
ticker: str
) -> str:
symlen = len(ticker)
if symlen == 6:
return ticker.lower()
else:
for sym in ['XXBT', 'XXMR', 'ZEUR']:
if sym in ticker:
ticker = ticker.replace(sym, sym[1:])
return ticker.lower()
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
"""Create a request subscription packet dict.
## TODO: point to the auth urls
https://docs.kraken.com/websockets/#message-subscribe
"""
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'event': 'subscribe',
'subscription': data,
}
async def handle_order_requests(
client: Client,
ems_order_stream: tractor.MsgStream,
) -> None:
request_msg: dict
order: BrokerdOrder
userref_counter = count()
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Kraken only, No account found: `{account}` ?',
).dict())
continue
# validate
temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=temp_id,
)
err = resp['error']
if err:
log.error(f'Failed to submit order')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=temp_id,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
## TODO: handle multiple cancels
## txid is an array of strings
reqid = resp['result']['txid'][0]
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
# account the made the order
account=order.account
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
try:
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled
assert resp['error'] == []
assert resp['result']['count'] == 1
## TODO: Change this code using .get
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
except AssertionError:
log.error(f'Order cancel was not successful')
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context @tractor.context
async def trades_dialogue( async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
@ -399,27 +614,99 @@ async def trades_dialogue(
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
# deliver positions to subscriber before anything else # Generate
# positions = await _trio_run_client_method(method='positions')
global _accounts2clients @asynccontextmanager
async def subscribe(ws: wsproto.WSConnection, token: str):
## TODO: Fix docs and points to right urls
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
trades_sub = make_auth_sub(
{'name': 'openOrders', 'token': token}
)
positions = await client.get_positions() # TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(trades_sub)
await tractor.breakpoint() ## trade data (aka L1)
#l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#)
## pull a first quote and deliver
#await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': ['openOrders'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
all_positions = {} # Authenticated block
async with get_client() as client:
acc_name = 'kraken.' + client._name
positions, vols = await client.get_positions()
for pos in positions: all_positions = []
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions) for ticker, pos in positions.items():
norm_sym = normalize_symbol(ticker)
if float(vols[ticker]) != 0:
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict())
open_orders = await client.kraken_endpoint('OpenOrders', {})
#await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,)))
#await trio.sleep_forever()
# Get websocket token for authenticated data stream
# Assert that a token was actually received
resp = await client.kraken_endpoint('GetWebSocketsToken', {})
assert resp['error'] == []
token = resp['result']['token']
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
from pprint import pprint
async for msg in process_order_msgs(ws):
pprint(msg)
async def stream_messages(ws): async def stream_messages(
ws: NoBsWs,
):
'''
Message stream parser and heartbeat handler.
Deliver ws subscription messages as well as handle heartbeat logic
though a single async generator.
'''
too_slow_count = last_hb = 0 too_slow_count = last_hb = 0
while True: while True:
@ -457,39 +744,76 @@ async def stream_messages(ws):
if err: if err:
raise BrokerError(err) raise BrokerError(err)
else: else:
chan_id, *payload_array, chan_name, pair = msg yield msg
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) async def process_data_feed_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
elif 'spread' in chan_name: '''
async for msg in stream_messages(ws):
bid, ask, ts, bsize, asize = map(float, payload_array[0]) chan_id, *payload_array, chan_name, pair = msg
# TODO: really makes you think IB has a horrible API... if 'ohlc' in chan_name:
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize}, yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]: elif 'spread' in chan_name:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else: bid, ask, ts, bsize, asize = map(float, payload_array[0])
print(f'UNHANDLED MSG: {msg}')
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
async def process_order_msgs(
ws: NoBsWs,
):
'''
Parse and pack data feed messages.
'''
async for msg in stream_messages(ws):
# TODO: write your order event parser here!
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
# form of order msgs:
# [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544',
# 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':
# '0.00000000', 'fee': '0.00000000', 'avg_price':
# '0.00000000', 'userref': 1, 'cancel_reason': 'User
# requested'}}], 'openOrders', {'sequence': 4}]
yield msg
def normalize( def normalize(
ohlc: OHLC, ohlc: OHLC,
) -> dict: ) -> dict:
quote = asdict(ohlc) quote = asdict(ohlc)
quote['broker_ts'] = quote['time'] quote['broker_ts'] = quote['time']
@ -629,15 +953,16 @@ async def stream_quotes(
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
# see the tips on reonnection logic: # see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
ws: NoBsWs
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://ws.kraken.com/', 'wss://ws.kraken.com/',
fixture=subscribe, fixture=subscribe,
) as ws: ) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await msg_gen.__anext__()
@ -696,6 +1021,7 @@ async def stream_quotes(
@tractor.context @tractor.context
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> Client: ) -> Client:
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:

View File

@ -493,7 +493,8 @@ async def open_brokerd_trades_dialogue(
finally: finally:
# parent context must have been closed # parent context must have been closed
# remove from cache so next client will respawn if needed # remove from cache so next client will respawn if needed
_router.relays.pop(broker) ## TODO: Maybe add a warning
_router.relays.pop(broker, None)
@tractor.context @tractor.context

View File

@ -389,7 +389,7 @@ async def handle_order_requests(
account = request_msg['account'] account = request_msg['account']
if account != 'paper': if account != 'paper':
log.error( log.error(
'On a paper account, only a `paper` selection is valid' 'This is a paper account, only a `paper` selection is valid'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], oid=request_msg['oid'],
@ -463,7 +463,8 @@ async def trades_dialogue(
): ):
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started(({}, ['paper'])) # await ctx.started(all_positions)
await ctx.started(({}, {'paper',}))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,

View File

@ -53,11 +53,13 @@ class NoBsWs:
def __init__( def __init__(
self, self,
url: str, url: str,
token: str,
stack: AsyncExitStack, stack: AsyncExitStack,
fixture: Callable, fixture: Callable,
serializer: ModuleType = json, serializer: ModuleType = json,
): ):
self.url = url self.url = url
self.token = token
self.fixture = fixture self.fixture = fixture
self._stack = stack self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa self._ws: 'WebSocketConnection' = None # noqa
@ -81,9 +83,15 @@ class NoBsWs:
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
# rerun user code fixture # rerun user code fixture
ret = await self._stack.enter_async_context( if self.token == '':
self.fixture(self) ret = await self._stack.enter_async_context(
) self.fixture(self)
)
else:
ret = await self._stack.enter_async_context(
self.fixture(self, self.token)
)
assert ret is None assert ret is None
log.info(f'Connection success: {self.url}') log.info(f'Connection success: {self.url}')
@ -127,12 +135,14 @@ async def open_autorecon_ws(
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Callable,
# used for authenticated websockets
token: str = '',
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.
""" """
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
ws = NoBsWs(url, stack, fixture=fixture) ws = NoBsWs(url, token, stack, fixture=fixture)
await ws._connect() await ws._connect()
try: try: