From 9890ab28110f0f415ed6461dccc9f3fd61690a56 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Mon, 23 Sep 2024 20:21:59 +0000 Subject: [PATCH 1/4] Added missing fields for kucoin. feeCategory, makerFeeCoefficient, takerFeeCoefficient and st. --- piker/brokers/kucoin.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 0f5961ae..200943ac 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True): quoteMaxSize: float quoteMinSize: float symbol: str # our bs_mktid, kucoin's internal id + feeCategory: int + makerFeeCoefficient: float + takerFeeCoefficient: float + st: bool class AccountTrade(Struct, frozen=True): -- 2.34.1 From 573268bac1ab67dfb5117a26766f9bed88ee717a Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Mon, 23 Sep 2024 20:24:12 +0000 Subject: [PATCH 2/4] Deleted settlePlan field from binance FutesPair. --- piker/brokers/binance/venues.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index dce0ea95..2c025fe1 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -181,7 +181,6 @@ class FutesPair(Pair): quoteAsset: str # 'USDT', quotePrecision: int # 8, requiredMarginPercent: float # '5.0000', - settlePlan: int # 0, timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], triggerProtect: float # '0.0500', underlyingSubType: list[str] # ['PoW'], -- 2.34.1 From df34791d2f3c67e0d1f5256de7bad7b4f337c568 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Oct 2024 15:58:02 -0400 Subject: [PATCH 3/4] `kucoin`: repair live quotes streaming.. This must have broke at some point during the new `MktPair` and thus `.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict` was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being processed by the downstream sampling and feed subsys. So fix that as well as a few other refinements, - set the `topic: mkt.bs_fqme` in quote msgs obvi. - drop the "wait for first clearing vlm" quote poll loop; going to fix the sampler to handle a `first_quote` without a `'last'` key. - add some typing around calls to `get_mkt_info()`. - rename `stream_messages()` -> `iter_normed_quotes()`. --- piker/brokers/kucoin.py | 61 +++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 200943ac..1dda64e7 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -597,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]: ''' async with ( httpx.AsyncClient( - base_url=f'https://api.kucoin.com/api', + base_url='https://api.kucoin.com/api', ) as trio_client, ): client = Client(httpx_client=trio_client) @@ -641,7 +641,7 @@ async def open_ping_task( await trio.sleep((ping_interval - 1000) / 1000) await ws.send_msg({'id': connect_id, 'type': 'ping'}) - log.info('Starting ping task for kucoin ws connection') + log.warning('Starting ping task for kucoin ws connection') n.start_soon(ping_server) yield @@ -653,9 +653,14 @@ async def open_ping_task( async def get_mkt_info( fqme: str, -) -> tuple[MktPair, KucoinMktPair]: +) -> tuple[ + MktPair, + KucoinMktPair, +]: ''' - Query for and return a `MktPair` and `KucoinMktPair`. + Query for and return both a `piker.accounting.MktPair` and + `KucoinMktPair` from provided `fqme: str` + (fully-qualified-market-endpoint). ''' async with open_cached_client('kucoin') as client: @@ -730,6 +735,8 @@ async def stream_quotes( log.info(f'Starting up quote stream(s) for {symbols}') for sym_str in symbols: + mkt: MktPair + pair: KucoinMktPair mkt, pair = await get_mkt_info(sym_str) init_msgs.append( FeedInit(mkt_info=mkt) @@ -737,7 +744,11 @@ async def stream_quotes( ws: NoBsWs token, ping_interval = await client._get_ws_token() - connect_id = str(uuid4()) + log.info('API reported ping_interval: {ping_interval}\n') + + connect_id: str = str(uuid4()) + typ: str + quote: dict async with ( open_autorecon_ws( ( @@ -751,20 +762,37 @@ async def stream_quotes( ), ) as ws, open_ping_task(ws, ping_interval, connect_id), - aclosing(stream_messages(ws, sym_str)) as msg_gen, + aclosing( + iter_normed_quotes( + ws, sym_str + ) + ) as iter_quotes, ): - typ, quote = await anext(msg_gen) + typ, quote = await anext(iter_quotes) - while typ != 'trade': - # take care to not unblock here until we get a real - # trade quote - typ, quote = await anext(msg_gen) + # take care to not unblock here until we get a real + # trade quote? + # ^TODO, remove this right? + # -[ ] what often blocks chart boot/new-feed switching + # since we'ere waiting for a live quote instead of just + # loading history afap.. + # |_ XXX, not sure if we require a bit of rework to core + # feed init logic or if backends justg gotta be + # changed up.. feel like there was some causality + # dilema prolly only seen with IB too.. + # while typ != 'trade': + # typ, quote = await anext(iter_quotes) task_status.started((init_msgs, quote)) feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym_str: msg}) + # XXX NOTE, DO NOT include the `.` suffix! + # OW the sampling loop will not broadcast correctly.. + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + async for typ, quote in iter_quotes: + await send_chan.send({topic: quote}) @acm @@ -819,7 +847,7 @@ async def subscribe( ) -async def stream_messages( +async def iter_normed_quotes( ws: NoBsWs, sym: str, @@ -850,6 +878,9 @@ async def stream_messages( yield 'trade', { 'symbol': sym, + # TODO, is 'last' even used elsewhere/a-good + # semantic? can't we just read the ticks with our + # .data.ticktools.frame_ticks()`/ 'last': trade_data.price, 'brokerd_ts': last_trade_ts, 'ticks': [ @@ -942,7 +973,7 @@ async def open_history_client( if end_dt is None: inow = round(time.time()) - print( + log.debug( f'difference in time between load and processing' f'{inow - times[-1]}' ) -- 2.34.1 From 27df649fbfda30bcd6b331400dcfe395a8476000 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Oct 2024 16:04:34 -0400 Subject: [PATCH 4/4] .clearing._ems: Don't require `first_quote['last']` Instead just check for the field (which i'm not huge on the key-name for anyway) and if not found get the "last price" from the real-time shm buffer's latest 'close' sample. Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop such that if a `client_stream` is popped on connection failure, we don't RTE for the "size changed on iteration". --- piker/clearing/_ems.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3f7045fa..af5fe690 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -653,7 +653,11 @@ class Router(Struct): flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqme]: float = float(first_quote['last']) + + if not (last := first_quote.get('last')): + last: float = flume.rt_shm.array[-1]['close'] + + book.lasts[fqme]: float = float(last) async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -716,7 +720,7 @@ class Router(Struct): subs = self.subscribers[sub_key] sent_some: bool = False - for client_stream in subs: + for client_stream in subs.copy(): try: await client_stream.send(msg) sent_some = True @@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( - status_msg.req.symbol, - status_msg, - ) + if not status_msg.req: + # likely some order change state? + await tractor.pause() + else: + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') -- 2.34.1