kraken: port to `FeedInit` and proper impl of `get_mkt_info()` ep

rekt_pps
Tyler Goodlet 2023-04-20 11:59:17 -04:00
parent 0d93871c88
commit d7288972b7
1 changed files with 36 additions and 20 deletions

View File

@ -36,15 +36,20 @@ import tractor
import trio
from piker.accounting._mktinfo import (
Asset,
MktPair,
)
from piker._cacheables import open_cached_client
from piker._cacheables import (
open_cached_client,
async_lifo_cache,
)
from piker.brokers._util import (
BrokerError,
DataThrottle,
DataUnavailable,
)
from piker.data.types import Struct
from piker.data.validate import FeedInit
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
@ -278,6 +283,7 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 1, 'rate': 1}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
@ -293,9 +299,25 @@ async def get_mkt_info(
async with open_cached_client('kraken') as client:
# uppercase since kraken bs_mktid is always upper
sym_str = fqme.upper()
pair: Pair = await client.pair_info(sym_str)
mkt: MktPair = await client.mkt_info(sym_str)
bs_fqme, _, broker = fqme.partition('.')
pair_str: str = bs_fqme.upper()
bs_mktid: str = Client.normalize_symbol(pair_str)
pair: Pair = await client.pair_info(pair_str)
assets = client.assets
dst_asset: Asset = assets[pair.base]
src_asset: Asset = assets[pair.quote]
mkt = MktPair(
dst=dst_asset,
src=src_asset,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=bs_mktid,
broker='kraken',
)
return mkt, pair
@ -321,29 +343,23 @@ async def stream_quotes(
'''
ws_pairs: list[str] = []
mkt_infos: dict[str, MktPair] = {}
init_msgs: list[FeedInit] = []
async with (
send_chan as send_chan,
):
for sym_str in symbols:
mkt, pair = await get_mkt_info(sym_str)
mkt_infos[sym_str] = mkt
ws_pairs.append(pair.wsname)
symbol = symbols[0].lower()
# sync with `.data.feed` caller
# TODO: should we make this init msg a `Struct`?
init_msgs = {
symbol: {
'fqsn': sym_str,
'mkt_info': mkt_infos[sym_str],
'shm_write_opts': {
init_msgs.append(
FeedInit(
mkt_info=mkt,
shm_write_opts={
'sum_tick_vml': False,
},
},
}
)
)
ws_pairs.append(pair.wsname)
@acm
async def subscribe(ws: NoBsWs):