`kraken`: drop `make_sub()` and inline sub defs in `subscribe()`

open_order_loading
Tyler Goodlet 2022-08-17 09:37:29 -04:00
parent 43bdd4d022
commit 06845e5504
1 changed files with 19 additions and 27 deletions

View File

@ -34,7 +34,6 @@ import pendulum
from trio_typing import TaskStatus
import tractor
import trio
import wsproto
from piker._cacheables import open_cached_client
from piker.brokers._util import (
@ -243,22 +242,6 @@ def normalize(
return topic, quote
def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]:
'''
Create a request subscription packet dict.
https://docs.kraken.com/websockets/#message-subscribe
'''
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'pair': pairs,
'event': 'subscribe',
'subscription': data,
}
@acm
async def open_history_client(
symbol: str,
@ -381,15 +364,20 @@ async def stream_quotes(
}
@acm
async def subscribe(ws: wsproto.WSConnection):
async def subscribe(ws: NoBsWs):
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
ohlc_sub = {
'event': 'subscribe',
'pair': list(ws_pairs.values()),
'subscription': {
'name': 'ohlc',
'interval': 1,
},
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
@ -398,10 +386,14 @@ async def stream_quotes(
await ws.send_msg(ohlc_sub)
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
)
l1_sub = {
'event': 'subscribe',
'pair': list(ws_pairs.values()),
'subscription': {
'name': 'spread',
# 'depth': 10}
},
}
# pull a first quote and deliver
await ws.send_msg(l1_sub)