diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index c530a837..76e9db7a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -40,17 +40,17 @@ from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws +from ..data._web_bs import open_autorecon_ws, NoBsWs from ..clearing._messages import ( BrokerdPosition, BrokerdOrder, BrokerdStatus, - BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill + BrokerdOrderAck, BrokerdError, BrokerdCancel, + BrokerdFill, ) import urllib.parse import hashlib import hmac import base64 -import pandas as pd log = get_logger(__name__) @@ -157,7 +157,7 @@ def get_config() -> dict[str, Any]: def get_kraken_signature( urlpath: str, - data: Dict[str, Any], + data: Dict[str, Any], secret: str ) -> str: postdata = urllib.parse.urlencode(data) @@ -171,9 +171,9 @@ def get_kraken_signature( class InvalidKey(ValueError): """EAPI:Invalid key - This error is returned when the API key used for the call is - either expired or disabled, please review the API key in your - Settings -> API tab of account management or generate a new one + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one and update your application.""" @@ -686,76 +686,27 @@ async def trades_dialogue( ): ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: - - while True: - with trio.move_on_after(5) as cs: - msg = await ws.recv_msg() - print(msg) - - ## pull a first quote and deliver - #msg_gen = stream_messages(ws) - - ## TODO: use ``anext()`` when it lands in 3.10! - #typ, ohlc_last = await msg_gen.__anext__() - - #topic, quote = normalize(ohlc_last) - - #first_quote = {topic: quote} - #task_status.started((init_msgs, first_quote)) - - ## lol, only "closes" when they're margin squeezing clients ;P - #feed_is_live.set() - - ## keep start of last interval for volume tracking - #last_interval_start = ohlc_last.etime - - ## start streaming - #async for typ, ohlc in msg_gen: - - # if typ == 'ohlc': - - # # TODO: can get rid of all this by using - # # ``trades`` subscription... - - # # generate tick values to match time & sales pane: - # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - # volume = ohlc.volume - - # # new OHLC sample interval - # if ohlc.etime > last_interval_start: - # last_interval_start = ohlc.etime - # tick_volume = volume - - # else: - # # this is the tick volume *within the interval* - # tick_volume = volume - ohlc_last.volume - - # ohlc_last = ohlc - # last = ohlc.close - - # if tick_volume: - # ohlc.ticks.append({ - # 'type': 'trade', - # 'price': last, - # 'size': tick_volume, - # }) - - # topic, quote = normalize(ohlc) - - # elif typ == 'l1': - # quote = ohlc - # topic = quote['symbol'].lower() - - # await send_chan.send({topic: quote}) + from pprint import pprint + async for msg in process_order_msgs(ws): + pprint(msg) -async def stream_messages(ws): +async def stream_messages( + ws: NoBsWs, +): + ''' + Message stream parser and heartbeat handler. + Deliver ws subscription messages as well as handle heartbeat logic + though a single async generator. + + ''' too_slow_count = last_hb = 0 while True: @@ -793,39 +744,76 @@ async def stream_messages(ws): if err: raise BrokerError(err) else: - chan_id, *payload_array, chan_name, pair = msg + yield msg - if 'ohlc' in chan_name: - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) +async def process_data_feed_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. - elif 'spread' in chan_name: + ''' + async for msg in stream_messages(ws): - bid, ask, ts, bsize, asize = map(float, payload_array[0]) + chan_id, *payload_array, chan_name, pair = msg - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + if 'ohlc' in chan_name: - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) + elif 'spread' in chan_name: - else: - print(f'UNHANDLED MSG: {msg}') + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') + yield msg + + +async def process_order_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + async for msg in stream_messages(ws): + + # TODO: write your order event parser here! + # HINT: create a ``pydantic.BaseModel`` to parse and validate + # and then in the caller recast to our native ``BrokerdX`` msg types. + + # form of order msgs: + # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', + # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': + # '0.00000000', 'fee': '0.00000000', 'avg_price': + # '0.00000000', 'userref': 1, 'cancel_reason': 'User + # requested'}}], 'openOrders', {'sequence': 4}] + + yield msg def normalize( ohlc: OHLC, + ) -> dict: quote = asdict(ohlc) quote['broker_ts'] = quote['time'] @@ -967,13 +955,14 @@ async def stream_quotes( # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + ws: NoBsWs async with open_autorecon_ws( 'wss://ws.kraken.com/', fixture=subscribe, ) as ws: # pull a first quote and deliver - msg_gen = stream_messages(ws) + msg_gen = process_data_feed_msgs(ws) # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await msg_gen.__anext__() @@ -1032,6 +1021,7 @@ async def stream_quotes( @tractor.context async def open_symbol_search( ctx: tractor.Context, + ) -> Client: async with open_cached_client('kraken') as client: