diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index dce0ea95..2c025fe1 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -181,7 +181,6 @@ class FutesPair(Pair): quoteAsset: str # 'USDT', quotePrecision: int # 8, requiredMarginPercent: float # '5.0000', - settlePlan: int # 0, timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], triggerProtect: float # '0.0500', underlyingSubType: list[str] # ['PoW'], diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 0f5961ae..1dda64e7 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True): quoteMaxSize: float quoteMinSize: float symbol: str # our bs_mktid, kucoin's internal id + feeCategory: int + makerFeeCoefficient: float + takerFeeCoefficient: float + st: bool class AccountTrade(Struct, frozen=True): @@ -593,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]: ''' async with ( httpx.AsyncClient( - base_url=f'https://api.kucoin.com/api', + base_url='https://api.kucoin.com/api', ) as trio_client, ): client = Client(httpx_client=trio_client) @@ -637,7 +641,7 @@ async def open_ping_task( await trio.sleep((ping_interval - 1000) / 1000) await ws.send_msg({'id': connect_id, 'type': 'ping'}) - log.info('Starting ping task for kucoin ws connection') + log.warning('Starting ping task for kucoin ws connection') n.start_soon(ping_server) yield @@ -649,9 +653,14 @@ async def open_ping_task( async def get_mkt_info( fqme: str, -) -> tuple[MktPair, KucoinMktPair]: +) -> tuple[ + MktPair, + KucoinMktPair, +]: ''' - Query for and return a `MktPair` and `KucoinMktPair`. + Query for and return both a `piker.accounting.MktPair` and + `KucoinMktPair` from provided `fqme: str` + (fully-qualified-market-endpoint). ''' async with open_cached_client('kucoin') as client: @@ -726,6 +735,8 @@ async def stream_quotes( log.info(f'Starting up quote stream(s) for {symbols}') for sym_str in symbols: + mkt: MktPair + pair: KucoinMktPair mkt, pair = await get_mkt_info(sym_str) init_msgs.append( FeedInit(mkt_info=mkt) @@ -733,7 +744,11 @@ async def stream_quotes( ws: NoBsWs token, ping_interval = await client._get_ws_token() - connect_id = str(uuid4()) + log.info('API reported ping_interval: {ping_interval}\n') + + connect_id: str = str(uuid4()) + typ: str + quote: dict async with ( open_autorecon_ws( ( @@ -747,20 +762,37 @@ async def stream_quotes( ), ) as ws, open_ping_task(ws, ping_interval, connect_id), - aclosing(stream_messages(ws, sym_str)) as msg_gen, + aclosing( + iter_normed_quotes( + ws, sym_str + ) + ) as iter_quotes, ): - typ, quote = await anext(msg_gen) + typ, quote = await anext(iter_quotes) - while typ != 'trade': - # take care to not unblock here until we get a real - # trade quote - typ, quote = await anext(msg_gen) + # take care to not unblock here until we get a real + # trade quote? + # ^TODO, remove this right? + # -[ ] what often blocks chart boot/new-feed switching + # since we'ere waiting for a live quote instead of just + # loading history afap.. + # |_ XXX, not sure if we require a bit of rework to core + # feed init logic or if backends justg gotta be + # changed up.. feel like there was some causality + # dilema prolly only seen with IB too.. + # while typ != 'trade': + # typ, quote = await anext(iter_quotes) task_status.started((init_msgs, quote)) feed_is_live.set() - async for typ, msg in msg_gen: - await send_chan.send({sym_str: msg}) + # XXX NOTE, DO NOT include the `.` suffix! + # OW the sampling loop will not broadcast correctly.. + # since `bus._subscribers.setdefault(bs_fqme, set())` + # is used inside `.data.open_feed_bus()` !!! + topic: str = mkt.bs_fqme + async for typ, quote in iter_quotes: + await send_chan.send({topic: quote}) @acm @@ -815,7 +847,7 @@ async def subscribe( ) -async def stream_messages( +async def iter_normed_quotes( ws: NoBsWs, sym: str, @@ -846,6 +878,9 @@ async def stream_messages( yield 'trade', { 'symbol': sym, + # TODO, is 'last' even used elsewhere/a-good + # semantic? can't we just read the ticks with our + # .data.ticktools.frame_ticks()`/ 'last': trade_data.price, 'brokerd_ts': last_trade_ts, 'ticks': [ @@ -938,7 +973,7 @@ async def open_history_client( if end_dt is None: inow = round(time.time()) - print( + log.debug( f'difference in time between load and processing' f'{inow - times[-1]}' ) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3f7045fa..af5fe690 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -653,7 +653,11 @@ class Router(Struct): flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqme]: float = float(first_quote['last']) + + if not (last := first_quote.get('last')): + last: float = flume.rt_shm.array[-1]['close'] + + book.lasts[fqme]: float = float(last) async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -716,7 +720,7 @@ class Router(Struct): subs = self.subscribers[sub_key] sent_some: bool = False - for client_stream in subs: + for client_stream in subs.copy(): try: await client_stream.send(msg) sent_some = True @@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( - status_msg.req.symbol, - status_msg, - ) + if not status_msg.req: + # likely some order change state? + await tractor.pause() + else: + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!')