`kucoin`: repair live quotes streaming..

This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
kucoin_and_binance_fix
Tyler Goodlet 2024-10-09 15:58:02 -04:00
parent 573268bac1
commit df34791d2f
1 changed files with 46 additions and 15 deletions

View File

@ -597,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
'''
async with (
httpx.AsyncClient(
base_url=f'https://api.kucoin.com/api',
base_url='https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
@ -641,7 +641,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info('Starting ping task for kucoin ws connection')
log.warning('Starting ping task for kucoin ws connection')
n.start_soon(ping_server)
yield
@ -653,9 +653,14 @@ async def open_ping_task(
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, KucoinMktPair]:
) -> tuple[
MktPair,
KucoinMktPair,
]:
'''
Query for and return a `MktPair` and `KucoinMktPair`.
Query for and return both a `piker.accounting.MktPair` and
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
'''
async with open_cached_client('kucoin') as client:
@ -730,6 +735,8 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str)
init_msgs.append(
FeedInit(mkt_info=mkt)
@ -737,7 +744,11 @@ async def stream_quotes(
ws: NoBsWs
token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4())
log.info('API reported ping_interval: {ping_interval}\n')
connect_id: str = str(uuid4())
typ: str
quote: dict
async with (
open_autorecon_ws(
(
@ -751,20 +762,37 @@ async def stream_quotes(
),
) as ws,
open_ping_task(ws, ping_interval, connect_id),
aclosing(stream_messages(ws, sym_str)) as msg_gen,
aclosing(
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
):
typ, quote = await anext(msg_gen)
typ, quote = await anext(iter_quotes)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real
# trade quote?
# ^TODO, remove this right?
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym_str: msg})
# XXX NOTE, DO NOT include the `.<backend>` suffix!
# OW the sampling loop will not broadcast correctly..
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm
@ -819,7 +847,7 @@ async def subscribe(
)
async def stream_messages(
async def iter_normed_quotes(
ws: NoBsWs,
sym: str,
@ -850,6 +878,9 @@ async def stream_messages(
yield 'trade', {
'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price,
'brokerd_ts': last_trade_ts,
'ticks': [
@ -942,7 +973,7 @@ async def open_history_client(
if end_dt is None:
inow = round(time.time())
print(
log.debug(
f'difference in time between load and processing'
f'{inow - times[-1]}'
)