From 38a1f0b9eea1c782ba144dccf0b72303a9b0046c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Oct 2024 15:58:02 -0400 Subject: [PATCH] `kucoin`: repair live quotes streaming.. This must have broke at some point during the new `MktPair` and thus `.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict` was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being processed by the downstream sampling and feed subsys. So fix that as well as a few other refinements, - set the `topic: mkt.bs_fqme` in quote msgs obvi. - drop the "wait for first clearing vlm" quote poll loop; going to fix the sampler to handle a `first_quote` without a `'last'` key. - add some typing around calls to `get_mkt_info()`. - rename `stream_messages()` -> `iter_normed_quotes()`. --- piker/brokers/kucoin.py | 61 +++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 200943ac..1dda64e7 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -597,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]: ''' async with ( httpx.AsyncClient( - base_url=f'https://api.kucoin.com/api', + base_url='https://api.kucoin.com/api', ) as trio_client, ): client = Client(httpx_client=trio_client) @@ -641,7 +641,7 @@ async def open_ping_task( await trio.sleep((ping_interval - 1000) / 1000) await ws.send_msg({'id': connect_id, 'type': 'ping'}) - log.info('Starting ping task for kucoin ws connection') + log.warning('Starting ping task for kucoin ws connection') n.start_soon(ping_server) yield @@ -653,9 +653,14 @@ async def open_ping_task( async def get_mkt_info( fqme: str, -) -> tuple[MktPair, KucoinMktPair]: +) -> tuple[ + MktPair, + KucoinMktPair, +]: ''' - Query for and return a `MktPair` and `KucoinMktPair`. + Query for and return both a `piker.accounting.MktPair` and + `KucoinMktPair` from provided `fqme: str` + (fully-qualified-market-endpoint). ''' async with open_cached_client('kucoin') as client: @@ -730,6 +735,8 @@ async def stream_quotes( log.info(f'Starting up quote stream(s) for {symbols}') for sym_str in symbols: + mkt: MktPair + pair: KucoinMktPair mkt, pair = await get_mkt_info(sym_str) init_msgs.append( FeedInit(mkt_info=mkt) @@ -737,7 +744,11 @@ async def stream_quotes( ws: NoBsWs token, ping_interval = await client._get_ws_token() - connect_id = str(uuid4()) + log.info('API reported ping_interval: {ping_interval}\n') + + connect_id: str = str(uuid4()) + typ: str + quote: dict async with ( open_autorecon_ws( ( @@ -751,20 +762,37 @@ async def stream_quotes( ), ) as ws, open_ping_task(ws, ping_interval, connect_id), - aclosing(stream_messages(ws, sym_str)) as msg_gen, + aclosing( + iter_normed_quotes( + ws, sym_str + ) + ) as iter_quotes, ): - typ, quote = await anext(msg_gen) + typ, quote = await anext(iter_quotes) - while typ != 'trade': - # take care to not unblock here until we get a real - # trade quote - typ, quote = await anext(msg_gen) + # take care to not unblock here until we get a real + # trade quote? + # ^TODO, remove this right? + # -[ ] what often blocks chart boot/new-feed switching + # since we'ere waiting for a live quote instead of just + # loading history afap.. + # |_ XXX, not sure if we require a bit of rework to core + # feed init logic or if backends justg gotta be + # changed up.. feel like there was some causality + # dilema prolly only seen with IB too.. + # while typ != 'trade': + # typ, quote = await anext(iter_quotes) task_status.started((init_msgs, quote)) feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym_str: msg}) + # XXX NOTE, DO NOT include the `.` suffix! + # OW the sampling loop will not broadcast correctly.. + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + async for typ, quote in iter_quotes: + await send_chan.send({topic: quote}) @acm @@ -819,7 +847,7 @@ async def subscribe( ) -async def stream_messages( +async def iter_normed_quotes( ws: NoBsWs, sym: str, @@ -850,6 +878,9 @@ async def stream_messages( yield 'trade', { 'symbol': sym, + # TODO, is 'last' even used elsewhere/a-good + # semantic? can't we just read the ticks with our + # .data.ticktools.frame_ticks()`/ 'last': trade_data.price, 'brokerd_ts': last_trade_ts, 'ticks': [ @@ -942,7 +973,7 @@ async def open_history_client( if end_dt is None: inow = round(time.time()) - print( + log.debug( f'difference in time between load and processing' f'{inow - times[-1]}' )