From 06845e5504fa325e37e5dc72281eebb3489b4589 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Aug 2022 09:37:29 -0400 Subject: [PATCH] `kraken`: drop `make_sub()` and inline sub defs in `subscribe()` --- piker/brokers/kraken/feed.py | 46 +++++++++++++++--------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 7c589d85..e67d204c 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -34,7 +34,6 @@ import pendulum from trio_typing import TaskStatus import tractor import trio -import wsproto from piker._cacheables import open_cached_client from piker.brokers._util import ( @@ -243,22 +242,6 @@ def normalize( return topic, quote -def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'pair': pairs, - 'event': 'subscribe', - 'subscription': data, - } - - @acm async def open_history_client( symbol: str, @@ -381,15 +364,20 @@ async def stream_quotes( } @acm - async def subscribe(ws: wsproto.WSConnection): + async def subscribe(ws: NoBsWs): + # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: + # specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) + ohlc_sub = { + 'event': 'subscribe', + 'pair': list(ws_pairs.values()), + 'subscription': { + 'name': 'ohlc', + 'interval': 1, + }, + } # TODO: we want to eventually allow unsubs which should # be completely fine to request from a separate task @@ -398,10 +386,14 @@ async def stream_quotes( await ws.send_msg(ohlc_sub) # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} - ) + l1_sub = { + 'event': 'subscribe', + 'pair': list(ws_pairs.values()), + 'subscription': { + 'name': 'spread', + # 'depth': 10} + }, + } # pull a first quote and deliver await ws.send_msg(l1_sub)