From cceb7a37b9c3b717ae381b504d05ff9b1afe3962 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Aug 2024 18:49:45 -0400 Subject: [PATCH 1/8] Lel, forgot to add a `SPOT` venue for `binance`.. --- piker/brokers/binance/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 2d1c4ee6..f163162e 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -567,6 +567,7 @@ class Client: ) -> str: return { 'USDTM': 'usdtm_futes', + 'SPOT': 'spot', # 'COINM': 'coin_futes', # ^-TODO-^ bc someone might want it..? }[pair.venue] From 99e90129ad9dc323dd8f965e66440dbf32de2d2e Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Mon, 23 Sep 2024 20:21:59 +0000 Subject: [PATCH 2/8] Added missing fields for kucoin. feeCategory, makerFeeCoefficient, takerFeeCoefficient and st. --- piker/brokers/kucoin.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 0f5961ae..200943ac 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True): quoteMaxSize: float quoteMinSize: float symbol: str # our bs_mktid, kucoin's internal id + feeCategory: int + makerFeeCoefficient: float + takerFeeCoefficient: float + st: bool class AccountTrade(Struct, frozen=True): From bb41dd6d18a772e7bbe9794067901feb89eafc35 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Mon, 23 Sep 2024 20:24:12 +0000 Subject: [PATCH 3/8] Deleted settlePlan field from binance FutesPair. --- piker/brokers/binance/venues.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index dce0ea95..2c025fe1 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -181,7 +181,6 @@ class FutesPair(Pair): quoteAsset: str # 'USDT', quotePrecision: int # 8, requiredMarginPercent: float # '5.0000', - settlePlan: int # 0, timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], triggerProtect: float # '0.0500', underlyingSubType: list[str] # ['PoW'], From cb88dfc9dade7527f75bf79b84b3656a90687aba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Oct 2024 15:58:02 -0400 Subject: [PATCH 4/8] `kucoin`: repair live quotes streaming.. This must have broke at some point during the new `MktPair` and thus `.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict` was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being processed by the downstream sampling and feed subsys. So fix that as well as a few other refinements, - set the `topic: mkt.bs_fqme` in quote msgs obvi. - drop the "wait for first clearing vlm" quote poll loop; going to fix the sampler to handle a `first_quote` without a `'last'` key. - add some typing around calls to `get_mkt_info()`. - rename `stream_messages()` -> `iter_normed_quotes()`. --- piker/brokers/kucoin.py | 61 +++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 200943ac..1dda64e7 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -597,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]: ''' async with ( httpx.AsyncClient( - base_url=f'https://api.kucoin.com/api', + base_url='https://api.kucoin.com/api', ) as trio_client, ): client = Client(httpx_client=trio_client) @@ -641,7 +641,7 @@ async def open_ping_task( await trio.sleep((ping_interval - 1000) / 1000) await ws.send_msg({'id': connect_id, 'type': 'ping'}) - log.info('Starting ping task for kucoin ws connection') + log.warning('Starting ping task for kucoin ws connection') n.start_soon(ping_server) yield @@ -653,9 +653,14 @@ async def open_ping_task( async def get_mkt_info( fqme: str, -) -> tuple[MktPair, KucoinMktPair]: +) -> tuple[ + MktPair, + KucoinMktPair, +]: ''' - Query for and return a `MktPair` and `KucoinMktPair`. + Query for and return both a `piker.accounting.MktPair` and + `KucoinMktPair` from provided `fqme: str` + (fully-qualified-market-endpoint). ''' async with open_cached_client('kucoin') as client: @@ -730,6 +735,8 @@ async def stream_quotes( log.info(f'Starting up quote stream(s) for {symbols}') for sym_str in symbols: + mkt: MktPair + pair: KucoinMktPair mkt, pair = await get_mkt_info(sym_str) init_msgs.append( FeedInit(mkt_info=mkt) @@ -737,7 +744,11 @@ async def stream_quotes( ws: NoBsWs token, ping_interval = await client._get_ws_token() - connect_id = str(uuid4()) + log.info('API reported ping_interval: {ping_interval}\n') + + connect_id: str = str(uuid4()) + typ: str + quote: dict async with ( open_autorecon_ws( ( @@ -751,20 +762,37 @@ async def stream_quotes( ), ) as ws, open_ping_task(ws, ping_interval, connect_id), - aclosing(stream_messages(ws, sym_str)) as msg_gen, + aclosing( + iter_normed_quotes( + ws, sym_str + ) + ) as iter_quotes, ): - typ, quote = await anext(msg_gen) + typ, quote = await anext(iter_quotes) - while typ != 'trade': - # take care to not unblock here until we get a real - # trade quote - typ, quote = await anext(msg_gen) + # take care to not unblock here until we get a real + # trade quote? + # ^TODO, remove this right? + # -[ ] what often blocks chart boot/new-feed switching + # since we'ere waiting for a live quote instead of just + # loading history afap.. + # |_ XXX, not sure if we require a bit of rework to core + # feed init logic or if backends justg gotta be + # changed up.. feel like there was some causality + # dilema prolly only seen with IB too.. + # while typ != 'trade': + # typ, quote = await anext(iter_quotes) task_status.started((init_msgs, quote)) feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym_str: msg}) + # XXX NOTE, DO NOT include the `.` suffix! + # OW the sampling loop will not broadcast correctly.. + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + async for typ, quote in iter_quotes: + await send_chan.send({topic: quote}) @acm @@ -819,7 +847,7 @@ async def subscribe( ) -async def stream_messages( +async def iter_normed_quotes( ws: NoBsWs, sym: str, @@ -850,6 +878,9 @@ async def stream_messages( yield 'trade', { 'symbol': sym, + # TODO, is 'last' even used elsewhere/a-good + # semantic? can't we just read the ticks with our + # .data.ticktools.frame_ticks()`/ 'last': trade_data.price, 'brokerd_ts': last_trade_ts, 'ticks': [ @@ -942,7 +973,7 @@ async def open_history_client( if end_dt is None: inow = round(time.time()) - print( + log.debug( f'difference in time between load and processing' f'{inow - times[-1]}' ) From e0fdabf651d0fdf62780eb32a947043e30dffb4a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Feb 2025 15:14:30 -0500 Subject: [PATCH 5/8] Use `../tractor` srcs in editable mode? --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e242ef00..14ec3ca8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,4 +144,4 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" } asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } msgspec = { git = "https://github.com/jcrist/msgspec.git" } -tractor = { path = "../tractor" } +tractor = { path = "../tractor", editable = true } From d7f6a5ab63a7e44f995c5048716c8dd7c180baa1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Feb 2025 16:31:57 -0500 Subject: [PATCH 6/8] Update to latest `KucoinMktPair` spec --- piker/brokers/kucoin.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 1dda64e7..bdd551c9 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -62,7 +62,7 @@ from piker._cacheables import ( ) from piker.log import get_logger from piker.data.validate import FeedInit -from piker.types import Struct +from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct` from piker.data import ( def_iohlcv_fields, match_from_pairs, @@ -98,9 +98,18 @@ class KucoinMktPair(Struct, frozen=True): def size_tick(self) -> Decimal: return Decimal(str(self.quoteMinSize)) + callauctionFirstStageStartTime: None|float + callauctionIsEnabled: bool + callauctionPriceCeiling: float|None + callauctionPriceFloor: float|None + callauctionSecondStageStartTime: float|None + callauctionThirdStageStartTime: float|None + enableTrading: bool + feeCategory: int feeCurrency: str isMarginEnabled: bool + makerFeeCoefficient: float market: str minFunds: float name: str @@ -110,11 +119,10 @@ class KucoinMktPair(Struct, frozen=True): quoteIncrement: float quoteMaxSize: float quoteMinSize: float - symbol: str # our bs_mktid, kucoin's internal id - feeCategory: int - makerFeeCoefficient: float - takerFeeCoefficient: float st: bool + symbol: str # our bs_mktid, kucoin's internal id + takerFeeCoefficient: float + tradingStartTime: float|None class AccountTrade(Struct, frozen=True): @@ -396,7 +404,13 @@ class Client: pairs: dict[str, KucoinMktPair] = {} fqmes2mktids: bidict[str, str] = bidict() for item in entries: - pair = pairs[item['name']] = KucoinMktPair(**item) + try: + pair = pairs[item['name']] = KucoinMktPair(**item) + except TypeError as te: + raise TypeError( + '`KucoinMktPair` and reponse fields do not match ??\n' + f'{KucoinMktPair.fields_diff(item)}\n' + ) from te fqmes2mktids[ item['name'].lower().replace('-', '') ] = pair.name From 8a4901c51729c674b9ab0254ea3cdb1b4994099d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 11:13:14 -0500 Subject: [PATCH 7/8] `.binance.feed`: moar type fixes, drop `rapidfuzz` --- piker/brokers/binance/feed.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 3a242e02..efe2f717 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -42,7 +42,6 @@ from trio_typing import TaskStatus from pendulum import ( from_timestamp, ) -from rapidfuzz import process as fuzzy import numpy as np import tractor @@ -111,6 +110,7 @@ class AggTrade(Struct, frozen=True): async def stream_messages( ws: NoBsWs, + ) -> AsyncGenerator[NoBsWs, dict]: # TODO: match syntax here! @@ -221,6 +221,8 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: } +# TODO, why aren't frame resp `log.info()`s showing in upstream +# code?! @acm async def open_history_client( mkt: MktPair, @@ -463,6 +465,8 @@ async def stream_quotes( ): init_msgs: list[FeedInit] = [] for sym in symbols: + mkt: MktPair + pair: Pair mkt, pair = await get_mkt_info(sym) # build out init msgs according to latest spec @@ -511,7 +515,6 @@ async def stream_quotes( # start streaming async for typ, quote in msg_gen: - # period = time.time() - last # hz = 1/period if period else float('inf') # if hz > 60: @@ -547,7 +550,7 @@ async def open_symbol_search( ) # repack in fqme-keyed table - byfqme: dict[start, Pair] = {} + byfqme: dict[str, Pair] = {} for pair in pairs.values(): byfqme[pair.bs_fqme] = pair From 00108010c910731a8f0b3875ca51d2564c0b1cae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 13:32:09 -0500 Subject: [PATCH 8/8] Mask `pytest` detection block in `piker.config` Seems to be some kinda super weird env bug since we moved to using `uv`? When it triggers it also seems to cause a pretty fundamental crash that not only breaks `tractor.devx._debug` stuff but also seems to get us in a perma-hang state where no SIGINT or other sys sig will be able to kill the root proc!?!? TODO, a `gitea` issue to track so we can fix the fundamental problem as well as transitive fault in `tractor`'s core which seems to be due to the error taking place during a sub-actor's module import phase which prevents the runtime from booting fully and then the proc getting stuck in a real gnarly SIG-state.. --- piker/config.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/piker/config.py b/piker/config.py index 981e5b5f..97eb4881 100644 --- a/piker/config.py +++ b/piker/config.py @@ -104,14 +104,15 @@ def get_app_dir( # `tractor`) with the testing dir and check for it whenever we # detect `pytest` is being used (which it isn't under normal # operation). - if "pytest" in sys.modules: - import tractor - actor = tractor.current_actor(err_on_no_runtime=False) - if actor: # runtime is up - rvs = tractor._state._runtime_vars - testdirpath = Path(rvs['piker_vars']['piker_test_dir']) - assert testdirpath.exists(), 'piker test harness might be borked!?' - app_name = str(testdirpath) + # if "pytest" in sys.modules: + # import tractor + # actor = tractor.current_actor(err_on_no_runtime=False) + # if actor: # runtime is up + # rvs = tractor._state._runtime_vars + # import pdbp; pdbp.set_trace() + # testdirpath = Path(rvs['piker_vars']['piker_test_dir']) + # assert testdirpath.exists(), 'piker test harness might be borked!?' + # app_name = str(testdirpath) if platform.system() == 'Windows': key = "APPDATA" if roaming else "LOCALAPPDATA"