diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 2d1c4ee6..f163162e 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -567,6 +567,7 @@ class Client: ) -> str: return { 'USDTM': 'usdtm_futes', + 'SPOT': 'spot', # 'COINM': 'coin_futes', # ^-TODO-^ bc someone might want it..? }[pair.venue] diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 3a242e02..efe2f717 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -42,7 +42,6 @@ from trio_typing import TaskStatus from pendulum import ( from_timestamp, ) -from rapidfuzz import process as fuzzy import numpy as np import tractor @@ -111,6 +110,7 @@ class AggTrade(Struct, frozen=True): async def stream_messages( ws: NoBsWs, + ) -> AsyncGenerator[NoBsWs, dict]: # TODO: match syntax here! @@ -221,6 +221,8 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: } +# TODO, why aren't frame resp `log.info()`s showing in upstream +# code?! @acm async def open_history_client( mkt: MktPair, @@ -463,6 +465,8 @@ async def stream_quotes( ): init_msgs: list[FeedInit] = [] for sym in symbols: + mkt: MktPair + pair: Pair mkt, pair = await get_mkt_info(sym) # build out init msgs according to latest spec @@ -511,7 +515,6 @@ async def stream_quotes( # start streaming async for typ, quote in msg_gen: - # period = time.time() - last # hz = 1/period if period else float('inf') # if hz > 60: @@ -547,7 +550,7 @@ async def open_symbol_search( ) # repack in fqme-keyed table - byfqme: dict[start, Pair] = {} + byfqme: dict[str, Pair] = {} for pair in pairs.values(): byfqme[pair.bs_fqme] = pair diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index dce0ea95..2c025fe1 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -181,7 +181,6 @@ class FutesPair(Pair): quoteAsset: str # 'USDT', quotePrecision: int # 8, requiredMarginPercent: float # '5.0000', - settlePlan: int # 0, timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], triggerProtect: float # '0.0500', underlyingSubType: list[str] # ['PoW'], diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 0f5961ae..bdd551c9 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -62,7 +62,7 @@ from piker._cacheables import ( ) from piker.log import get_logger from piker.data.validate import FeedInit -from piker.types import Struct +from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct` from piker.data import ( def_iohlcv_fields, match_from_pairs, @@ -98,9 +98,18 @@ class KucoinMktPair(Struct, frozen=True): def size_tick(self) -> Decimal: return Decimal(str(self.quoteMinSize)) + callauctionFirstStageStartTime: None|float + callauctionIsEnabled: bool + callauctionPriceCeiling: float|None + callauctionPriceFloor: float|None + callauctionSecondStageStartTime: float|None + callauctionThirdStageStartTime: float|None + enableTrading: bool + feeCategory: int feeCurrency: str isMarginEnabled: bool + makerFeeCoefficient: float market: str minFunds: float name: str @@ -110,7 +119,10 @@ class KucoinMktPair(Struct, frozen=True): quoteIncrement: float quoteMaxSize: float quoteMinSize: float + st: bool symbol: str # our bs_mktid, kucoin's internal id + takerFeeCoefficient: float + tradingStartTime: float|None class AccountTrade(Struct, frozen=True): @@ -392,7 +404,13 @@ class Client: pairs: dict[str, KucoinMktPair] = {} fqmes2mktids: bidict[str, str] = bidict() for item in entries: - pair = pairs[item['name']] = KucoinMktPair(**item) + try: + pair = pairs[item['name']] = KucoinMktPair(**item) + except TypeError as te: + raise TypeError( + '`KucoinMktPair` and reponse fields do not match ??\n' + f'{KucoinMktPair.fields_diff(item)}\n' + ) from te fqmes2mktids[ item['name'].lower().replace('-', '') ] = pair.name @@ -593,7 +611,7 @@ async def get_client() -> AsyncGenerator[Client, None]: ''' async with ( httpx.AsyncClient( - base_url=f'https://api.kucoin.com/api', + base_url='https://api.kucoin.com/api', ) as trio_client, ): client = Client(httpx_client=trio_client) @@ -637,7 +655,7 @@ async def open_ping_task( await trio.sleep((ping_interval - 1000) / 1000) await ws.send_msg({'id': connect_id, 'type': 'ping'}) - log.info('Starting ping task for kucoin ws connection') + log.warning('Starting ping task for kucoin ws connection') n.start_soon(ping_server) yield @@ -649,9 +667,14 @@ async def open_ping_task( async def get_mkt_info( fqme: str, -) -> tuple[MktPair, KucoinMktPair]: +) -> tuple[ + MktPair, + KucoinMktPair, +]: ''' - Query for and return a `MktPair` and `KucoinMktPair`. + Query for and return both a `piker.accounting.MktPair` and + `KucoinMktPair` from provided `fqme: str` + (fully-qualified-market-endpoint). ''' async with open_cached_client('kucoin') as client: @@ -726,6 +749,8 @@ async def stream_quotes( log.info(f'Starting up quote stream(s) for {symbols}') for sym_str in symbols: + mkt: MktPair + pair: KucoinMktPair mkt, pair = await get_mkt_info(sym_str) init_msgs.append( FeedInit(mkt_info=mkt) @@ -733,7 +758,11 @@ async def stream_quotes( ws: NoBsWs token, ping_interval = await client._get_ws_token() - connect_id = str(uuid4()) + log.info('API reported ping_interval: {ping_interval}\n') + + connect_id: str = str(uuid4()) + typ: str + quote: dict async with ( open_autorecon_ws( ( @@ -747,20 +776,37 @@ async def stream_quotes( ), ) as ws, open_ping_task(ws, ping_interval, connect_id), - aclosing(stream_messages(ws, sym_str)) as msg_gen, + aclosing( + iter_normed_quotes( + ws, sym_str + ) + ) as iter_quotes, ): - typ, quote = await anext(msg_gen) + typ, quote = await anext(iter_quotes) - while typ != 'trade': - # take care to not unblock here until we get a real - # trade quote - typ, quote = await anext(msg_gen) + # take care to not unblock here until we get a real + # trade quote? + # ^TODO, remove this right? + # -[ ] what often blocks chart boot/new-feed switching + # since we'ere waiting for a live quote instead of just + # loading history afap.. + # |_ XXX, not sure if we require a bit of rework to core + # feed init logic or if backends justg gotta be + # changed up.. feel like there was some causality + # dilema prolly only seen with IB too.. + # while typ != 'trade': + # typ, quote = await anext(iter_quotes) task_status.started((init_msgs, quote)) feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym_str: msg}) + # XXX NOTE, DO NOT include the `.` suffix! + # OW the sampling loop will not broadcast correctly.. + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + async for typ, quote in iter_quotes: + await send_chan.send({topic: quote}) @acm @@ -815,7 +861,7 @@ async def subscribe( ) -async def stream_messages( +async def iter_normed_quotes( ws: NoBsWs, sym: str, @@ -846,6 +892,9 @@ async def stream_messages( yield 'trade', { 'symbol': sym, + # TODO, is 'last' even used elsewhere/a-good + # semantic? can't we just read the ticks with our + # .data.ticktools.frame_ticks()`/ 'last': trade_data.price, 'brokerd_ts': last_trade_ts, 'ticks': [ @@ -938,7 +987,7 @@ async def open_history_client( if end_dt is None: inow = round(time.time()) - print( + log.debug( f'difference in time between load and processing' f'{inow - times[-1]}' ) diff --git a/piker/config.py b/piker/config.py index 981e5b5f..97eb4881 100644 --- a/piker/config.py +++ b/piker/config.py @@ -104,14 +104,15 @@ def get_app_dir( # `tractor`) with the testing dir and check for it whenever we # detect `pytest` is being used (which it isn't under normal # operation). - if "pytest" in sys.modules: - import tractor - actor = tractor.current_actor(err_on_no_runtime=False) - if actor: # runtime is up - rvs = tractor._state._runtime_vars - testdirpath = Path(rvs['piker_vars']['piker_test_dir']) - assert testdirpath.exists(), 'piker test harness might be borked!?' - app_name = str(testdirpath) + # if "pytest" in sys.modules: + # import tractor + # actor = tractor.current_actor(err_on_no_runtime=False) + # if actor: # runtime is up + # rvs = tractor._state._runtime_vars + # import pdbp; pdbp.set_trace() + # testdirpath = Path(rvs['piker_vars']['piker_test_dir']) + # assert testdirpath.exists(), 'piker test harness might be borked!?' + # app_name = str(testdirpath) if platform.system() == 'Windows': key = "APPDATA" if roaming else "LOCALAPPDATA" diff --git a/pyproject.toml b/pyproject.toml index e242ef00..14ec3ca8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,4 +144,4 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" } asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } msgspec = { git = "https://github.com/jcrist/msgspec.git" } -tractor = { path = "../tractor" } +tractor = { path = "../tractor", editable = true }