diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index cd36f4e5..cd04c950 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -34,6 +34,7 @@ from .api import ( get_client, ) from .feed import ( + get_mkt_info, open_history_client, open_symbol_search, stream_quotes, diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index a54329db..ff4f57a9 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -44,7 +44,6 @@ from piker.brokers._util import ( DataThrottle, DataUnavailable, ) -from piker.log import get_console_log from piker.data.types import Struct from piker.data._web_bs import open_autorecon_ws, NoBsWs from . import log @@ -279,6 +278,27 @@ async def open_history_client( yield get_ohlc, {'erlangs': 1, 'rate': 1} +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair]: + ''' + Query for and return a `MktPair` and backend-native `Pair` (or + wtv else) info. + + If more then one fqme is provided return a ``dict`` of native + key-strs to `MktPair`s. + + ''' + async with open_cached_client('kraken') as client: + + # uppercase since kraken bs_mktid is always upper + sym_str = fqme.upper() + pair: Pair = await client.pair_info(sym_str) + mkt: MktPair = await client.mkt_info(sym_str) + return mkt, pair + + async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -299,26 +319,17 @@ async def stream_quotes( ``pairs`` must be formatted /. ''' - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - ws_pairs = {} + ws_pairs: list[str] = [] mkt_infos: dict[str, MktPair] = {} async with ( - open_cached_client('kraken') as client, send_chan as send_chan, ): - # keep client cached for real-time section - for sym in symbols: - - # uppercase since piker style is always lowercase. - sym_str = sym.upper() - pair: Pair = await client.pair_info(sym_str) - mkt: MktPair = await client.mkt_info(sym_str) + for sym_str in symbols: + mkt, pair = await get_mkt_info(sym_str) mkt_infos[sym_str] = mkt - - ws_pairs[sym_str] = pair.wsname + ws_pairs.append(pair.wsname) symbol = symbols[0].lower() @@ -343,7 +354,7 @@ async def stream_quotes( # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 ohlc_sub = { 'event': 'subscribe', - 'pair': list(ws_pairs.values()), + 'pair': ws_pairs, 'subscription': { 'name': 'ohlc', 'interval': 1, @@ -359,7 +370,7 @@ async def stream_quotes( # trade data (aka L1) l1_sub = { 'event': 'subscribe', - 'pair': list(ws_pairs.values()), + 'pair': ws_pairs, 'subscription': { 'name': 'spread', # 'depth': 10} @@ -374,7 +385,7 @@ async def stream_quotes( # unsub from all pairs on teardown if ws.connected(): await ws.send_msg({ - 'pair': list(ws_pairs.values()), + 'pair': ws_pairs, 'event': 'unsubscribe', 'subscription': ['ohlc', 'spread'], })